diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncOperation.cpp | 363 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncOperation.h | 208 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 70 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/OperationQueue.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/OperationQueue.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/AsyncResultQueueImpl.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/AsyncStore.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnAsyncContext.cpp | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnAsyncContext.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnBuffer.cpp | 57 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnBuffer.h | 3 |
11 files changed, 534 insertions, 217 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp index b8fdb8b140..a22f803fcd 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp +++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp @@ -23,101 +23,320 @@ #include "AsyncOperation.h" -#include "qpid/Exception.h" +//#include "qpid/Exception.h" +#include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/AsyncResultHandleImpl.h" -#include <sstream> +//#include <sstream> namespace qpid { namespace asyncStore { -AsyncOperation::AsyncOperation() : - m_op(NONE), - m_targetHandle(), - m_dataSrc(0), - m_txnHandle(0) +AsyncOperation::AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + m_brokerCtxt(brokerCtxt) {} -AsyncOperation::AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - m_op(op), - m_targetHandle(th), - m_dataSrc(0), - m_txnHandle(0), - m_brokerCtxt(brokerCtxt) +AsyncOperation::~AsyncOperation() {} -AsyncOperation::AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - m_op(op), - m_targetHandle(th), - m_dataSrc(dataSrc), - m_txnHandle(0), - m_brokerCtxt(brokerCtxt) +boost::shared_ptr<qpid::broker::BrokerAsyncContext> AsyncOperation::getBrokerContext() const +{ + return m_brokerCtxt; +} + +void +AsyncOperation::submitResult() +{ + return submitResult(0, ""); +} + +void +AsyncOperation::submitResult(const int errNo, + const std::string& errMsg) +{ + if (m_brokerCtxt.get()) { + qpid::broker::AsyncResultQueue* const arq = m_brokerCtxt->getAsyncResultQueue(); + if (arq) { + qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(errNo, errMsg, m_brokerCtxt); + boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi)); + arq->submit(arh); + } + } +} + + +// --- class AsyncOpTxnPrepare --- + +AsyncOpTxnPrepare::AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_txnHandle(txnHandle) {} -AsyncOperation::AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::TxnHandle* txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - m_op(op), - m_targetHandle(th), - m_dataSrc(0), - m_txnHandle(txnHandle), - m_brokerCtxt(brokerCtxt) +AsyncOpTxnPrepare::~AsyncOpTxnPrepare() {} + +void +AsyncOpTxnPrepare::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpTxnPrepare::getOpStr() const { + return "TXN_PREPARE"; +} + + + +// --- class AsyncOpTxnCommit --- + +AsyncOpTxnCommit::AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_txnHandle(txnHandle) {} -AsyncOperation::AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::DataSource* const dataSrc, - const qpid::broker::TxnHandle* txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - m_op(op), - m_targetHandle(th), - m_dataSrc(dataSrc), - m_txnHandle(txnHandle), - m_brokerCtxt(brokerCtxt) +AsyncOpTxnCommit::~AsyncOpTxnCommit() {} + +void +AsyncOpTxnCommit::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpTxnCommit::getOpStr() const { + return "TXN_COMMIT"; +} + + +// --- class AsyncOpTxnAbort --- + +AsyncOpTxnAbort::AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_txnHandle(txnHandle) {} -AsyncOperation::~AsyncOperation() +AsyncOpTxnAbort::~AsyncOpTxnAbort() {} + +void +AsyncOpTxnAbort::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpTxnAbort::getOpStr() const { + return "TXN_ABORT"; +} + + +// --- class AsyncOpConfigCreate --- + +AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle, + const qpid::broker::DataSource* const data, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_cfgHandle(cfgHandle), + m_data(data) {} +AsyncOpConfigCreate::~AsyncOpConfigCreate() {} + +void +AsyncOpConfigCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + const char* -AsyncOperation::getOpStr() const -{ - return getOpStr(m_op); +AsyncOpConfigCreate::getOpStr() const { + return "CONFIG_CREATE"; } -boost::shared_ptr<qpid::broker::BrokerAsyncContext> -AsyncOperation::getBrokerContext() const -{ - return m_brokerCtxt; + +// --- class AsyncOpConfigDestroy --- + +AsyncOpConfigDestroy::AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_cfgHandle(cfgHandle) +{} + +AsyncOpConfigDestroy::~AsyncOpConfigDestroy() {} + +void +AsyncOpConfigDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); } -//static const char* -AsyncOperation::getOpStr(const opCode op) -{ - switch (op) { - case NONE: return "<none>"; - case TXN_PREPARE: return "TXN_PREPARE"; - case TXN_COMMIT: return "TXN_COMMIT"; - case TXN_ABORT: return "TXN_ABORT"; - case CONFIG_CREATE: return "CONFIG_CREATE"; - case CONFIG_DESTROY: return "CONFIG_DESTROY"; - case QUEUE_CREATE: return "QUEUE_CREATE"; - case QUEUE_FLUSH: return "QUEUE_FLUSH"; - case QUEUE_DESTROY: return "QUEUE_DESTROY"; - case EVENT_CREATE: return "EVENT_CREATE"; - case EVENT_DESTROY: return "EVENT_DESTROY"; - case MSG_ENQUEUE: return "MSG_ENQUEUE"; - case MSG_DEQUEUE: return "MSG_DEQUEUE"; - } - std::ostringstream oss; - oss << "AsyncStore: AsyncOperation::getOpStr(): Unknown op-code \"" << op << "\""; - throw qpid::Exception(oss.str()); +AsyncOpConfigDestroy::getOpStr() const { + return "CONFIG_DESTROY"; +} + + +// --- class AsyncOpQueueCreate --- + +AsyncOpQueueCreate::AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle, + const qpid::broker::DataSource* const data, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_queueHandle(queueHandle), + m_data(data) +{} + +AsyncOpQueueCreate::~AsyncOpQueueCreate() {} + +void +AsyncOpQueueCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpQueueCreate::getOpStr() const { + return "QUEUE_CREATE"; +} + + +// --- class AsyncOpQueueFlush --- + +AsyncOpQueueFlush::AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_queueHandle(queueHandle) +{} + +AsyncOpQueueFlush::~AsyncOpQueueFlush() {} + +void +AsyncOpQueueFlush::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpQueueFlush::getOpStr() const { + return "QUEUE_FLUSH"; +} + + +// --- class AsyncOpQueueDestroy --- + +AsyncOpQueueDestroy::AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_queueHandle(queueHandle) +{} + +AsyncOpQueueDestroy::~AsyncOpQueueDestroy() {} + +void +AsyncOpQueueDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpQueueDestroy::getOpStr() const { + return "QUEUE_DESTROY"; +} + + +// --- class AsyncOpEventCreate --- + +AsyncOpEventCreate::AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle, + const qpid::broker::DataSource* const data, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_evtHandle(evtHandle), + m_data(data), + m_txnHandle(txnHandle) +{} + +AsyncOpEventCreate::~AsyncOpEventCreate() {} + +void +AsyncOpEventCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpEventCreate::getOpStr() const { + return "EVENT_CREATE"; +} + + +// --- class AsyncOpEventDestroy --- + +AsyncOpEventDestroy::AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_evtHandle(evtHandle), + m_txnHandle(txnHandle) +{} + +AsyncOpEventDestroy::~AsyncOpEventDestroy() {} + +void +AsyncOpEventDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpEventDestroy::getOpStr() const { + return "EVENT_DESTROY"; +} + + +// --- class AsyncOpMsgEnqueue --- + +AsyncOpMsgEnqueue::AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_enqHandle(enqHandle), + m_txnHandle(txnHandle) +{} + +AsyncOpMsgEnqueue::~AsyncOpMsgEnqueue() {} + +void AsyncOpMsgEnqueue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* AsyncOpMsgEnqueue::getOpStr() const { + return "MSG_ENQUEUE"; +} + + +// --- class AsyncOpMsgDequeue --- + +AsyncOpMsgDequeue::AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_enqHandle(enqHandle), + m_txnHandle(txnHandle) +{} + +AsyncOpMsgDequeue::~AsyncOpMsgDequeue() {} + +void AsyncOpMsgDequeue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* AsyncOpMsgDequeue::getOpStr() const { + return "MSG_DEQUEUE"; } }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h index cb73ad639b..2b195d7443 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.h +++ b/cpp/src/qpid/asyncStore/AsyncOperation.h @@ -26,57 +26,185 @@ #include "qpid/broker/AsyncStore.h" +#include <boost/shared_ptr.hpp> + namespace qpid { namespace asyncStore { -class AsyncStoreHandle; +class AsyncStoreImpl; class AsyncOperation { public: - typedef enum {NONE=0, - TXN_PREPARE, - TXN_COMMIT, - TXN_ABORT, - CONFIG_CREATE, - CONFIG_DESTROY, - QUEUE_CREATE, - QUEUE_FLUSH, - QUEUE_DESTROY, - EVENT_CREATE, - EVENT_DESTROY, - MSG_ENQUEUE, - MSG_DEQUEUE - } opCode; - - AsyncOperation(); - AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::TxnHandle* txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::DataSource* const dataSrc, - const qpid::broker::TxnHandle* txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); virtual ~AsyncOperation(); - const char* getOpStr() const; - static const char* getOpStr(const opCode op); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store) = 0; boost::shared_ptr<qpid::broker::BrokerAsyncContext> getBrokerContext() const; - + virtual const char* getOpStr() const = 0; +protected: + void submitResult(); + void submitResult(const int errNo, + const std::string& errMsg); private: - opCode m_op; - const AsyncStoreHandle* m_targetHandle; - const qpid::broker::DataSource* const m_dataSrc; - const qpid::broker::TxnHandle* m_txnHandle; boost::shared_ptr<qpid::broker::BrokerAsyncContext> const m_brokerCtxt; }; + +class AsyncOpTxnPrepare: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpTxnPrepare(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpTxnCommit: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpTxnCommit(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpTxnAbort: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpTxnAbort(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpConfigCreate: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle, + const qpid::broker::DataSource* const data, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpConfigCreate(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::ConfigHandle& m_cfgHandle; + const qpid::broker::DataSource* const m_data; +}; + + +class AsyncOpConfigDestroy: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpConfigDestroy(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::ConfigHandle& m_cfgHandle; +}; + + +class AsyncOpQueueCreate: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle, + const qpid::broker::DataSource* const data, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpQueueCreate(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::QueueHandle& m_queueHandle; + const qpid::broker::DataSource* const m_data; +}; + + +class AsyncOpQueueFlush: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpQueueFlush(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::QueueHandle& m_queueHandle; +}; + + +class AsyncOpQueueDestroy: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpQueueDestroy(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::QueueHandle& m_queueHandle; +}; + + +class AsyncOpEventCreate: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle, + const qpid::broker::DataSource* const data, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpEventCreate(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::EventHandle& m_evtHandle; + const qpid::broker::DataSource* const m_data; + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpEventDestroy: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpEventDestroy(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::EventHandle& m_evtHandle; + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpMsgEnqueue: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpMsgEnqueue(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::EnqueueHandle& m_enqHandle; + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpMsgDequeue: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpMsgDequeue(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::EnqueueHandle& m_enqHandle; + qpid::broker::TxnHandle& m_txnHandle; +}; + }} // namespace qpid::asyncStore #endif // qpid_asyncStore_AsyncOperation_h_ diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 6b5c3ac582..e8379b95e2 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -23,6 +23,7 @@ #include "AsyncStoreImpl.h" +#include "AsyncOperation.h" #include "ConfigHandleImpl.h" #include "EnqueueHandleImpl.h" #include "EventHandleImpl.h" @@ -37,6 +38,8 @@ #include "qpid/broker/QueueHandle.h" #include "qpid/broker/TxnHandle.h" +//#include <boost/make_shared.hpp> + namespace qpid { namespace asyncStore { @@ -94,9 +97,8 @@ void AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE, - dynamic_cast<AsyncStoreHandle*>(&txnHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -104,9 +106,8 @@ void AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT, - dynamic_cast<AsyncStoreHandle*>(&txnHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -114,9 +115,8 @@ void AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT, - dynamic_cast<AsyncStoreHandle*>(&txnHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -161,10 +161,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE, - dynamic_cast<AsyncStoreHandle*>(&cfgHandle), - dataSrc, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -172,9 +170,8 @@ void AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY, - dynamic_cast<AsyncStoreHandle*>(&cfgHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -183,10 +180,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE, - dynamic_cast<AsyncStoreHandle*>(&queueHandle), - dataSrc, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -194,9 +189,8 @@ void AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY, - dynamic_cast<AsyncStoreHandle*>(&queueHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -204,9 +198,8 @@ void AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH, - dynamic_cast<AsyncStoreHandle*>(&queueHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -216,11 +209,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE, - dynamic_cast<AsyncStoreHandle*>(&eventHandle), - dataSrc, - &txnHandle, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -229,10 +219,8 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY, - dynamic_cast<AsyncStoreHandle*>(&eventHandle), - &txnHandle, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -241,10 +229,8 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE, - dynamic_cast<AsyncStoreHandle*>(&enqHandle), - &txnHandle, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -253,10 +239,8 @@ AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE, - dynamic_cast<AsyncStoreHandle*>(&enqHandle), - &txnHandle, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index dd4e7c1343..b7b8970c39 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -54,7 +54,7 @@ OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) try { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext(); - if (bc) { + if (bc.get()) { qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue(); if (arq) { qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc); diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h index 143ac2ab0c..473b5f1cfb 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.h +++ b/cpp/src/qpid/asyncStore/OperationQueue.h @@ -26,7 +26,6 @@ #include "AsyncOperation.h" -//#include "qpid/broker/AsyncStore.h" #include "qpid/sys/PollableQueue.h" namespace qpid { @@ -43,6 +42,7 @@ private: typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncOperation> > OpQueue; OpQueue m_opQueue; + // Callback function for pollable queue, defined in qpid::sys::PollableQueue OpQueue::Batch::const_iterator handle(const OpQueue::Batch& e); }; diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.h b/cpp/src/qpid/broker/AsyncResultQueueImpl.h index 01fff6a53a..805057400f 100644 --- a/cpp/src/qpid/broker/AsyncResultQueueImpl.h +++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.h @@ -44,6 +44,7 @@ private: typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncResultHandle> > ResultQueue; ResultQueue m_resQueue; + // Callback function for pollable queue, defined in qpid::sys::PollableQueue ResultQueue::Batch::const_iterator handle(const ResultQueue::Batch& e); }; diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 7bb6175862..1c8f4b0737 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -46,6 +46,10 @@ public: virtual ~BrokerAsyncContext() {} virtual AsyncResultQueue* getAsyncResultQueue() const = 0; virtual void invokeCallback(const AsyncResultHandle* const) const = 0; + void setOpStr(const char* opStr) { m_opStr = opStr; } + const char* getOpStr() const { return m_opStr; } +private: + const char* m_opStr; }; class DataSource { @@ -83,6 +87,7 @@ public: boost::shared_ptr<BrokerAsyncContext>) = 0; virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + void testOp() const {} }; // Subclassed by store: diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp index c3d4342993..63e2de2b41 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.cpp +++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp @@ -27,13 +27,9 @@ namespace qpid { namespace broker { TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, - TxnHandle& th, - const qpid::asyncStore::AsyncOperation::opCode op, - qpid::broker::AsyncResultCallback rcb, - qpid::broker::AsyncResultQueue* const arq): + AsyncResultCallback rcb, + AsyncResultQueue* const arq): m_tb(tb), - m_th(th), - m_op(op), m_rcb(rcb), m_arq(arq) {} @@ -47,24 +43,6 @@ TxnAsyncContext::getTxnBuffer() const return m_tb; } -qpid::asyncStore::AsyncOperation::opCode -TxnAsyncContext::getOpCode() const -{ - return m_op; -} - -const char* -TxnAsyncContext::getOpStr() const -{ - return qpid::asyncStore::AsyncOperation::getOpStr(m_op); -} - -TxnHandle& -TxnAsyncContext::getTransactionContext() const -{ - return m_th; -} - AsyncResultQueue* TxnAsyncContext::getAsyncResultQueue() const { diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h index 810c46429c..0c35b110a8 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.h +++ b/cpp/src/qpid/broker/TxnAsyncContext.h @@ -29,6 +29,9 @@ #include "qpid/asyncStore/AsyncOperation.h" namespace qpid { +namespace asyncStore { +class AsyncOperation; +} namespace broker { class TxnHandle; @@ -39,15 +42,10 @@ class TxnAsyncContext: public BrokerAsyncContext { public: TxnAsyncContext(TxnBuffer* const tb, - TxnHandle& th, - const qpid::asyncStore::AsyncOperation::opCode op, - qpid::broker::AsyncResultCallback rcb, - qpid::broker::AsyncResultQueue* const arq); + AsyncResultCallback rcb, + AsyncResultQueue* const arq); virtual ~TxnAsyncContext(); TxnBuffer* getTxnBuffer() const; - qpid::asyncStore::AsyncOperation::opCode getOpCode() const; - const char* getOpStr() const; - TxnHandle& getTransactionContext() const; // --- Interface BrokerAsyncContext --- AsyncResultQueue* getAsyncResultQueue() const; @@ -55,8 +53,6 @@ public: private: TxnBuffer* const m_tb; - TxnHandle& m_th; - const qpid::asyncStore::AsyncOperation::opCode m_op; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; }; diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp index 425d725e9e..ba00293452 100644 --- a/cpp/src/qpid/broker/TxnBuffer.cpp +++ b/cpp/src/qpid/broker/TxnBuffer.cpp @@ -96,26 +96,6 @@ TxnBuffer::commitLocal(AsyncTransaction* const store) return false; } -// static -void -TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh) -{ - if (arh) { - boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); - if (arh->getErrNo()) { - QPID_LOG(error, "TxnBuffer::handleAsyncResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() - << " (" << arh->getErrMsg() << ")"); - tac->getTxnBuffer()->asyncLocalAbort(); - } else { - if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) { - tac->getTxnBuffer()->asyncLocalAbort(); - } else { - tac->getTxnBuffer()->asyncLocalCommit(); - } - } - } -} - void TxnBuffer::asyncLocalCommit() { @@ -130,10 +110,9 @@ TxnBuffer::asyncLocalCommit() m_state = COMMIT; { boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, - m_txnHandle, - qpid::asyncStore::AsyncOperation::TXN_COMMIT, - &handleAsyncResult, + &handleAsyncCommitResult, &m_resultQueue)); + m_store->testOp(); m_store->submitCommit(m_txnHandle, tac); } break; @@ -147,6 +126,21 @@ TxnBuffer::asyncLocalCommit() } } +//static +void +TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + QPID_LOG(error, "TxnBuffer::handleAsyncCommitResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() + << " (" << arh->getErrMsg() << ")"); + tac->getTxnBuffer()->asyncLocalAbort(); + } else { + tac->getTxnBuffer()->asyncLocalCommit(); + } + } +} + void TxnBuffer::asyncLocalAbort() { @@ -158,9 +152,7 @@ TxnBuffer::asyncLocalAbort() m_state = ROLLBACK; { boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, - m_txnHandle, - qpid::asyncStore::AsyncOperation::TXN_ABORT, - &handleAsyncResult, + &handleAsyncAbortResult, &m_resultQueue)); m_store->submitCommit(m_txnHandle, tac); } @@ -173,4 +165,17 @@ TxnBuffer::asyncLocalAbort() } } +//static +void +TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() + << " (" << arh->getErrMsg() << ")"); + } + tac->getTxnBuffer()->asyncLocalAbort(); + } +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h index cd78846b71..bd9743cad7 100644 --- a/cpp/src/qpid/broker/TxnBuffer.h +++ b/cpp/src/qpid/broker/TxnBuffer.h @@ -51,9 +51,10 @@ public: bool commitLocal(AsyncTransaction* const store); // --- Async operations --- - static void handleAsyncResult(const AsyncResultHandle* const arh); void asyncLocalCommit(); + static void handleAsyncCommitResult(const AsyncResultHandle* const arh); void asyncLocalAbort(); + static void handleAsyncAbortResult(const AsyncResultHandle* const arh); private: std::vector<boost::shared_ptr<TxnOp> > m_ops; |