summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/asyncStore
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/asyncStore')
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.cpp31
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.h12
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp52
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h20
4 files changed, 58 insertions, 57 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
index a22f803fcd..0e3586854c 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
@@ -23,11 +23,10 @@
#include "AsyncOperation.h"
-//#include "qpid/Exception.h"
#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/AsyncResultHandleImpl.h"
-
-//#include <sstream>
+#include "qpid/broker/QueueAsyncContext.h"
+#include "qpid/broker/TxnAsyncContext.h"
namespace qpid {
namespace asyncStore {
@@ -68,8 +67,8 @@ AsyncOperation::submitResult(const int errNo,
// --- class AsyncOpTxnPrepare ---
AsyncOpTxnPrepare::AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- AsyncOperation(brokerCtxt),
+ boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt)),
m_txnHandle(txnHandle)
{}
@@ -91,8 +90,8 @@ AsyncOpTxnPrepare::getOpStr() const {
// --- class AsyncOpTxnCommit ---
AsyncOpTxnCommit::AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- AsyncOperation(brokerCtxt),
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt)),
m_txnHandle(txnHandle)
{}
@@ -113,8 +112,8 @@ AsyncOpTxnCommit::getOpStr() const {
// --- class AsyncOpTxnAbort ---
AsyncOpTxnAbort::AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- AsyncOperation(brokerCtxt),
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt)),
m_txnHandle(txnHandle)
{}
@@ -181,9 +180,9 @@ AsyncOpConfigDestroy::getOpStr() const {
// --- class AsyncOpQueueCreate ---
AsyncOpQueueCreate::AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle,
- const qpid::broker::DataSource* const data,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- AsyncOperation(brokerCtxt),
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt)),
m_queueHandle(queueHandle),
m_data(data)
{}
@@ -205,8 +204,8 @@ AsyncOpQueueCreate::getOpStr() const {
// --- class AsyncOpQueueFlush ---
AsyncOpQueueFlush::AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- AsyncOperation(brokerCtxt),
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt)),
m_queueHandle(queueHandle)
{}
@@ -227,8 +226,8 @@ AsyncOpQueueFlush::getOpStr() const {
// --- class AsyncOpQueueDestroy ---
AsyncOpQueueDestroy::AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- AsyncOperation(brokerCtxt),
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt)),
m_queueHandle(queueHandle)
{}
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h
index 2b195d7443..2894816ca4 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.h
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.h
@@ -51,7 +51,7 @@ private:
class AsyncOpTxnPrepare: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt);
virtual ~AsyncOpTxnPrepare();
virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
virtual const char* getOpStr() const;
@@ -63,7 +63,7 @@ private:
class AsyncOpTxnCommit: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt);
virtual ~AsyncOpTxnCommit();
virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
virtual const char* getOpStr() const;
@@ -75,7 +75,7 @@ private:
class AsyncOpTxnAbort: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt);
virtual ~AsyncOpTxnAbort();
virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
virtual const char* getOpStr() const;
@@ -114,7 +114,7 @@ class AsyncOpQueueCreate: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle,
const qpid::broker::DataSource* const data,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt);
virtual ~AsyncOpQueueCreate();
virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
virtual const char* getOpStr() const;
@@ -127,7 +127,7 @@ private:
class AsyncOpQueueFlush: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt);
virtual ~AsyncOpQueueFlush();
virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
virtual const char* getOpStr() const;
@@ -139,7 +139,7 @@ private:
class AsyncOpQueueDestroy: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt);
virtual ~AsyncOpQueueDestroy();
virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
virtual const char* getOpStr() const;
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index e8379b95e2..4aeab4c7bf 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -35,11 +35,11 @@
#include "qpid/broker/EnqueueHandle.h"
#include "qpid/broker/EventHandle.h"
#include "qpid/broker/MessageHandle.h"
+#include "qpid/broker/QueueAsyncContext.h"
#include "qpid/broker/QueueHandle.h"
+#include "qpid/broker/TxnAsyncContext.h"
#include "qpid/broker/TxnHandle.h"
-//#include <boost/make_shared.hpp>
-
namespace qpid {
namespace asyncStore {
@@ -95,28 +95,28 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid,
void
AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
+ boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, brokerCtxt));
- brokerCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, TxnCtxt));
+ TxnCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, brokerCtxt));
- brokerCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, TxnCtxt));
+ TxnCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, brokerCtxt));
- brokerCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, TxnCtxt));
+ TxnCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -178,28 +178,28 @@ AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
void
AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle,
const qpid::broker::DataSource* const dataSrc,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, brokerCtxt));
- brokerCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, QueueCtxt));
+ QueueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, brokerCtxt));
- brokerCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, QueueCtxt));
+ QueueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, brokerCtxt));
- brokerCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, QueueCtxt));
+ QueueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -227,20 +227,20 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
void
AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, brokerCtxt));
- brokerCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, QueueCtxt));
+ QueueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, brokerCtxt));
- brokerCtxt->setOpStr(op->getOpStr());
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, QueueCtxt));
+ QueueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
index 7dee03dc6d..3e29039aea 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -42,7 +42,9 @@ class Poller;
namespace asyncStore {
-class AsyncStoreImpl : public qpid::broker::AsyncStore {
+class AsyncStoreImpl : public qpid::broker::AsyncTransactionalStore,
+ public qpid::broker::AsyncStore
+{
public:
AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
const AsyncStoreOptions& opts);
@@ -63,11 +65,11 @@ public:
qpid::broker::TxnBuffer* tb);
void submitPrepare(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt);
void submitCommit(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt);
void submitAbort(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt);
// --- Interface from AsyncStore ---
@@ -89,11 +91,11 @@ public:
void submitCreate(qpid::broker::QueueHandle& queueHandle,
const qpid::broker::DataSource* const dataSrc,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt);
void submitDestroy(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt);
void submitFlush(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt);
void submitCreate(qpid::broker::EventHandle& eventHandle,
const qpid::broker::DataSource* const dataSrc,
@@ -105,10 +107,10 @@ public:
void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt);
void submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt);
// Legacy - Restore FTD message, is NOT async!
virtual int loadContent(qpid::broker::MessageHandle& msgHandle,