diff options
Diffstat (limited to 'cpp/src/qpid/asyncStore')
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncOperation.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncOperation.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 52 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.h | 20 |
4 files changed, 58 insertions, 57 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp index a22f803fcd..0e3586854c 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp +++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp @@ -23,11 +23,10 @@ #include "AsyncOperation.h" -//#include "qpid/Exception.h" #include "qpid/broker/AsyncResultHandle.h" #include "qpid/broker/AsyncResultHandleImpl.h" - -//#include <sstream> +#include "qpid/broker/QueueAsyncContext.h" +#include "qpid/broker/TxnAsyncContext.h" namespace qpid { namespace asyncStore { @@ -68,8 +67,8 @@ AsyncOperation::submitResult(const int errNo, // --- class AsyncOpTxnPrepare --- AsyncOpTxnPrepare::AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - AsyncOperation(brokerCtxt), + boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt)), m_txnHandle(txnHandle) {} @@ -91,8 +90,8 @@ AsyncOpTxnPrepare::getOpStr() const { // --- class AsyncOpTxnCommit --- AsyncOpTxnCommit::AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - AsyncOperation(brokerCtxt), + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt)), m_txnHandle(txnHandle) {} @@ -113,8 +112,8 @@ AsyncOpTxnCommit::getOpStr() const { // --- class AsyncOpTxnAbort --- AsyncOpTxnAbort::AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - AsyncOperation(brokerCtxt), + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt)), m_txnHandle(txnHandle) {} @@ -181,9 +180,9 @@ AsyncOpConfigDestroy::getOpStr() const { // --- class AsyncOpQueueCreate --- AsyncOpQueueCreate::AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle, - const qpid::broker::DataSource* const data, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - AsyncOperation(brokerCtxt), + const qpid::broker::DataSource* const data, + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt)), m_queueHandle(queueHandle), m_data(data) {} @@ -205,8 +204,8 @@ AsyncOpQueueCreate::getOpStr() const { // --- class AsyncOpQueueFlush --- AsyncOpQueueFlush::AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - AsyncOperation(brokerCtxt), + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt)), m_queueHandle(queueHandle) {} @@ -227,8 +226,8 @@ AsyncOpQueueFlush::getOpStr() const { // --- class AsyncOpQueueDestroy --- AsyncOpQueueDestroy::AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - AsyncOperation(brokerCtxt), + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt)), m_queueHandle(queueHandle) {} diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h index 2b195d7443..2894816ca4 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.h +++ b/cpp/src/qpid/asyncStore/AsyncOperation.h @@ -51,7 +51,7 @@ private: class AsyncOpTxnPrepare: public qpid::asyncStore::AsyncOperation { public: AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt); virtual ~AsyncOpTxnPrepare(); virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); virtual const char* getOpStr() const; @@ -63,7 +63,7 @@ private: class AsyncOpTxnCommit: public qpid::asyncStore::AsyncOperation { public: AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt); virtual ~AsyncOpTxnCommit(); virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); virtual const char* getOpStr() const; @@ -75,7 +75,7 @@ private: class AsyncOpTxnAbort: public qpid::asyncStore::AsyncOperation { public: AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt); virtual ~AsyncOpTxnAbort(); virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); virtual const char* getOpStr() const; @@ -114,7 +114,7 @@ class AsyncOpQueueCreate: public qpid::asyncStore::AsyncOperation { public: AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle, const qpid::broker::DataSource* const data, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt); virtual ~AsyncOpQueueCreate(); virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); virtual const char* getOpStr() const; @@ -127,7 +127,7 @@ private: class AsyncOpQueueFlush: public qpid::asyncStore::AsyncOperation { public: AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt); virtual ~AsyncOpQueueFlush(); virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); virtual const char* getOpStr() const; @@ -139,7 +139,7 @@ private: class AsyncOpQueueDestroy: public qpid::asyncStore::AsyncOperation { public: AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt); virtual ~AsyncOpQueueDestroy(); virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); virtual const char* getOpStr() const; diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index e8379b95e2..4aeab4c7bf 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -35,11 +35,11 @@ #include "qpid/broker/EnqueueHandle.h" #include "qpid/broker/EventHandle.h" #include "qpid/broker/MessageHandle.h" +#include "qpid/broker/QueueAsyncContext.h" #include "qpid/broker/QueueHandle.h" +#include "qpid/broker/TxnAsyncContext.h" #include "qpid/broker/TxnHandle.h" -//#include <boost/make_shared.hpp> - namespace qpid { namespace asyncStore { @@ -95,28 +95,28 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid, void AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) + boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, brokerCtxt)); - brokerCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, TxnCtxt)); + TxnCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) + boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, brokerCtxt)); - brokerCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, TxnCtxt)); + TxnCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) + boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, brokerCtxt)); - brokerCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, TxnCtxt)); + TxnCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -178,28 +178,28 @@ AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, void AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) + boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, brokerCtxt)); - brokerCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, QueueCtxt)); + QueueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) + boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, brokerCtxt)); - brokerCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, QueueCtxt)); + QueueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) + boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, brokerCtxt)); - brokerCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, QueueCtxt)); + QueueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -227,20 +227,20 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, void AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) + boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, brokerCtxt)); - brokerCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, QueueCtxt)); + QueueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } void AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) + boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, brokerCtxt)); - brokerCtxt->setOpStr(op->getOpStr()); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, QueueCtxt)); + QueueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index 7dee03dc6d..3e29039aea 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -42,7 +42,9 @@ class Poller; namespace asyncStore { -class AsyncStoreImpl : public qpid::broker::AsyncStore { +class AsyncStoreImpl : public qpid::broker::AsyncTransactionalStore, + public qpid::broker::AsyncStore +{ public: AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, const AsyncStoreOptions& opts); @@ -63,11 +65,11 @@ public: qpid::broker::TxnBuffer* tb); void submitPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt); void submitCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt); void submitAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt); // --- Interface from AsyncStore --- @@ -89,11 +91,11 @@ public: void submitCreate(qpid::broker::QueueHandle& queueHandle, const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt); void submitDestroy(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt); void submitFlush(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt); void submitCreate(qpid::broker::EventHandle& eventHandle, const qpid::broker::DataSource* const dataSrc, @@ -105,10 +107,10 @@ public: void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt); void submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt); // Legacy - Restore FTD message, is NOT async! virtual int loadContent(qpid::broker::MessageHandle& msgHandle, |