diff options
Diffstat (limited to 'cpp/src/qpid/asyncStore')
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncOperation.cpp | 363 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncOperation.h | 208 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 70 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/OperationQueue.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/OperationQueue.h | 2 |
5 files changed, 488 insertions, 157 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp index b8fdb8b140..a22f803fcd 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp +++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp @@ -23,101 +23,320 @@ #include "AsyncOperation.h" -#include "qpid/Exception.h" +//#include "qpid/Exception.h" +#include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/AsyncResultHandleImpl.h" -#include <sstream> +//#include <sstream> namespace qpid { namespace asyncStore { -AsyncOperation::AsyncOperation() : - m_op(NONE), - m_targetHandle(), - m_dataSrc(0), - m_txnHandle(0) +AsyncOperation::AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + m_brokerCtxt(brokerCtxt) {} -AsyncOperation::AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - m_op(op), - m_targetHandle(th), - m_dataSrc(0), - m_txnHandle(0), - m_brokerCtxt(brokerCtxt) +AsyncOperation::~AsyncOperation() {} -AsyncOperation::AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - m_op(op), - m_targetHandle(th), - m_dataSrc(dataSrc), - m_txnHandle(0), - m_brokerCtxt(brokerCtxt) +boost::shared_ptr<qpid::broker::BrokerAsyncContext> AsyncOperation::getBrokerContext() const +{ + return m_brokerCtxt; +} + +void +AsyncOperation::submitResult() +{ + return submitResult(0, ""); +} + +void +AsyncOperation::submitResult(const int errNo, + const std::string& errMsg) +{ + if (m_brokerCtxt.get()) { + qpid::broker::AsyncResultQueue* const arq = m_brokerCtxt->getAsyncResultQueue(); + if (arq) { + qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(errNo, errMsg, m_brokerCtxt); + boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi)); + arq->submit(arh); + } + } +} + + +// --- class AsyncOpTxnPrepare --- + +AsyncOpTxnPrepare::AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_txnHandle(txnHandle) {} -AsyncOperation::AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::TxnHandle* txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - m_op(op), - m_targetHandle(th), - m_dataSrc(0), - m_txnHandle(txnHandle), - m_brokerCtxt(brokerCtxt) +AsyncOpTxnPrepare::~AsyncOpTxnPrepare() {} + +void +AsyncOpTxnPrepare::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpTxnPrepare::getOpStr() const { + return "TXN_PREPARE"; +} + + + +// --- class AsyncOpTxnCommit --- + +AsyncOpTxnCommit::AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_txnHandle(txnHandle) {} -AsyncOperation::AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::DataSource* const dataSrc, - const qpid::broker::TxnHandle* txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - m_op(op), - m_targetHandle(th), - m_dataSrc(dataSrc), - m_txnHandle(txnHandle), - m_brokerCtxt(brokerCtxt) +AsyncOpTxnCommit::~AsyncOpTxnCommit() {} + +void +AsyncOpTxnCommit::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpTxnCommit::getOpStr() const { + return "TXN_COMMIT"; +} + + +// --- class AsyncOpTxnAbort --- + +AsyncOpTxnAbort::AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_txnHandle(txnHandle) {} -AsyncOperation::~AsyncOperation() +AsyncOpTxnAbort::~AsyncOpTxnAbort() {} + +void +AsyncOpTxnAbort::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpTxnAbort::getOpStr() const { + return "TXN_ABORT"; +} + + +// --- class AsyncOpConfigCreate --- + +AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle, + const qpid::broker::DataSource* const data, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_cfgHandle(cfgHandle), + m_data(data) {} +AsyncOpConfigCreate::~AsyncOpConfigCreate() {} + +void +AsyncOpConfigCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + const char* -AsyncOperation::getOpStr() const -{ - return getOpStr(m_op); +AsyncOpConfigCreate::getOpStr() const { + return "CONFIG_CREATE"; } -boost::shared_ptr<qpid::broker::BrokerAsyncContext> -AsyncOperation::getBrokerContext() const -{ - return m_brokerCtxt; + +// --- class AsyncOpConfigDestroy --- + +AsyncOpConfigDestroy::AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_cfgHandle(cfgHandle) +{} + +AsyncOpConfigDestroy::~AsyncOpConfigDestroy() {} + +void +AsyncOpConfigDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); } -//static const char* -AsyncOperation::getOpStr(const opCode op) -{ - switch (op) { - case NONE: return "<none>"; - case TXN_PREPARE: return "TXN_PREPARE"; - case TXN_COMMIT: return "TXN_COMMIT"; - case TXN_ABORT: return "TXN_ABORT"; - case CONFIG_CREATE: return "CONFIG_CREATE"; - case CONFIG_DESTROY: return "CONFIG_DESTROY"; - case QUEUE_CREATE: return "QUEUE_CREATE"; - case QUEUE_FLUSH: return "QUEUE_FLUSH"; - case QUEUE_DESTROY: return "QUEUE_DESTROY"; - case EVENT_CREATE: return "EVENT_CREATE"; - case EVENT_DESTROY: return "EVENT_DESTROY"; - case MSG_ENQUEUE: return "MSG_ENQUEUE"; - case MSG_DEQUEUE: return "MSG_DEQUEUE"; - } - std::ostringstream oss; - oss << "AsyncStore: AsyncOperation::getOpStr(): Unknown op-code \"" << op << "\""; - throw qpid::Exception(oss.str()); +AsyncOpConfigDestroy::getOpStr() const { + return "CONFIG_DESTROY"; +} + + +// --- class AsyncOpQueueCreate --- + +AsyncOpQueueCreate::AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle, + const qpid::broker::DataSource* const data, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_queueHandle(queueHandle), + m_data(data) +{} + +AsyncOpQueueCreate::~AsyncOpQueueCreate() {} + +void +AsyncOpQueueCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpQueueCreate::getOpStr() const { + return "QUEUE_CREATE"; +} + + +// --- class AsyncOpQueueFlush --- + +AsyncOpQueueFlush::AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_queueHandle(queueHandle) +{} + +AsyncOpQueueFlush::~AsyncOpQueueFlush() {} + +void +AsyncOpQueueFlush::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpQueueFlush::getOpStr() const { + return "QUEUE_FLUSH"; +} + + +// --- class AsyncOpQueueDestroy --- + +AsyncOpQueueDestroy::AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_queueHandle(queueHandle) +{} + +AsyncOpQueueDestroy::~AsyncOpQueueDestroy() {} + +void +AsyncOpQueueDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpQueueDestroy::getOpStr() const { + return "QUEUE_DESTROY"; +} + + +// --- class AsyncOpEventCreate --- + +AsyncOpEventCreate::AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle, + const qpid::broker::DataSource* const data, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_evtHandle(evtHandle), + m_data(data), + m_txnHandle(txnHandle) +{} + +AsyncOpEventCreate::~AsyncOpEventCreate() {} + +void +AsyncOpEventCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpEventCreate::getOpStr() const { + return "EVENT_CREATE"; +} + + +// --- class AsyncOpEventDestroy --- + +AsyncOpEventDestroy::AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_evtHandle(evtHandle), + m_txnHandle(txnHandle) +{} + +AsyncOpEventDestroy::~AsyncOpEventDestroy() {} + +void +AsyncOpEventDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpEventDestroy::getOpStr() const { + return "EVENT_DESTROY"; +} + + +// --- class AsyncOpMsgEnqueue --- + +AsyncOpMsgEnqueue::AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_enqHandle(enqHandle), + m_txnHandle(txnHandle) +{} + +AsyncOpMsgEnqueue::~AsyncOpMsgEnqueue() {} + +void AsyncOpMsgEnqueue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* AsyncOpMsgEnqueue::getOpStr() const { + return "MSG_ENQUEUE"; +} + + +// --- class AsyncOpMsgDequeue --- + +AsyncOpMsgDequeue::AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_enqHandle(enqHandle), + m_txnHandle(txnHandle) +{} + +AsyncOpMsgDequeue::~AsyncOpMsgDequeue() {} + +void AsyncOpMsgDequeue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* AsyncOpMsgDequeue::getOpStr() const { + return "MSG_DEQUEUE"; } }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h index cb73ad639b..2b195d7443 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.h +++ b/cpp/src/qpid/asyncStore/AsyncOperation.h @@ -26,57 +26,185 @@ #include "qpid/broker/AsyncStore.h" +#include <boost/shared_ptr.hpp> + namespace qpid { namespace asyncStore { -class AsyncStoreHandle; +class AsyncStoreImpl; class AsyncOperation { public: - typedef enum {NONE=0, - TXN_PREPARE, - TXN_COMMIT, - TXN_ABORT, - CONFIG_CREATE, - CONFIG_DESTROY, - QUEUE_CREATE, - QUEUE_FLUSH, - QUEUE_DESTROY, - EVENT_CREATE, - EVENT_DESTROY, - MSG_ENQUEUE, - MSG_DEQUEUE - } opCode; - - AsyncOperation(); - AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::TxnHandle* txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::DataSource* const dataSrc, - const qpid::broker::TxnHandle* txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); virtual ~AsyncOperation(); - const char* getOpStr() const; - static const char* getOpStr(const opCode op); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store) = 0; boost::shared_ptr<qpid::broker::BrokerAsyncContext> getBrokerContext() const; - + virtual const char* getOpStr() const = 0; +protected: + void submitResult(); + void submitResult(const int errNo, + const std::string& errMsg); private: - opCode m_op; - const AsyncStoreHandle* m_targetHandle; - const qpid::broker::DataSource* const m_dataSrc; - const qpid::broker::TxnHandle* m_txnHandle; boost::shared_ptr<qpid::broker::BrokerAsyncContext> const m_brokerCtxt; }; + +class AsyncOpTxnPrepare: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpTxnPrepare(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpTxnCommit: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpTxnCommit(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpTxnAbort: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpTxnAbort(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpConfigCreate: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle, + const qpid::broker::DataSource* const data, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpConfigCreate(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::ConfigHandle& m_cfgHandle; + const qpid::broker::DataSource* const m_data; +}; + + +class AsyncOpConfigDestroy: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpConfigDestroy(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::ConfigHandle& m_cfgHandle; +}; + + +class AsyncOpQueueCreate: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle, + const qpid::broker::DataSource* const data, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpQueueCreate(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::QueueHandle& m_queueHandle; + const qpid::broker::DataSource* const m_data; +}; + + +class AsyncOpQueueFlush: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpQueueFlush(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::QueueHandle& m_queueHandle; +}; + + +class AsyncOpQueueDestroy: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpQueueDestroy(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::QueueHandle& m_queueHandle; +}; + + +class AsyncOpEventCreate: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle, + const qpid::broker::DataSource* const data, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpEventCreate(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::EventHandle& m_evtHandle; + const qpid::broker::DataSource* const m_data; + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpEventDestroy: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpEventDestroy(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::EventHandle& m_evtHandle; + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpMsgEnqueue: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpMsgEnqueue(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::EnqueueHandle& m_enqHandle; + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpMsgDequeue: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpMsgDequeue(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::EnqueueHandle& m_enqHandle; + qpid::broker::TxnHandle& m_txnHandle; +}; + }} // namespace qpid::asyncStore #endif // qpid_asyncStore_AsyncOperation_h_ diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 6b5c3ac582..e8379b95e2 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -23,6 +23,7 @@ #include "AsyncStoreImpl.h" +#include "AsyncOperation.h" #include "ConfigHandleImpl.h" #include "EnqueueHandleImpl.h" #include "EventHandleImpl.h" @@ -37,6 +38,8 @@ #include "qpid/broker/QueueHandle.h" #include "qpid/broker/TxnHandle.h" +//#include <boost/make_shared.hpp> + namespace qpid { namespace asyncStore { @@ -94,9 +97,8 @@ void AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE, - dynamic_cast<AsyncStoreHandle*>(&txnHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -104,9 +106,8 @@ void AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT, - dynamic_cast<AsyncStoreHandle*>(&txnHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -114,9 +115,8 @@ void AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT, - dynamic_cast<AsyncStoreHandle*>(&txnHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -161,10 +161,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE, - dynamic_cast<AsyncStoreHandle*>(&cfgHandle), - dataSrc, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -172,9 +170,8 @@ void AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY, - dynamic_cast<AsyncStoreHandle*>(&cfgHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -183,10 +180,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE, - dynamic_cast<AsyncStoreHandle*>(&queueHandle), - dataSrc, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -194,9 +189,8 @@ void AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY, - dynamic_cast<AsyncStoreHandle*>(&queueHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -204,9 +198,8 @@ void AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH, - dynamic_cast<AsyncStoreHandle*>(&queueHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -216,11 +209,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE, - dynamic_cast<AsyncStoreHandle*>(&eventHandle), - dataSrc, - &txnHandle, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -229,10 +219,8 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY, - dynamic_cast<AsyncStoreHandle*>(&eventHandle), - &txnHandle, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -241,10 +229,8 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE, - dynamic_cast<AsyncStoreHandle*>(&enqHandle), - &txnHandle, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -253,10 +239,8 @@ AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE, - dynamic_cast<AsyncStoreHandle*>(&enqHandle), - &txnHandle, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index dd4e7c1343..b7b8970c39 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -54,7 +54,7 @@ OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) try { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext(); - if (bc) { + if (bc.get()) { qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue(); if (arq) { qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc); diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h index 143ac2ab0c..473b5f1cfb 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.h +++ b/cpp/src/qpid/asyncStore/OperationQueue.h @@ -26,7 +26,6 @@ #include "AsyncOperation.h" -//#include "qpid/broker/AsyncStore.h" #include "qpid/sys/PollableQueue.h" namespace qpid { @@ -43,6 +42,7 @@ private: typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncOperation> > OpQueue; OpQueue m_opQueue; + // Callback function for pollable queue, defined in qpid::sys::PollableQueue OpQueue::Batch::const_iterator handle(const OpQueue::Batch& e); }; |