diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-07-20 12:55:20 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-07-20 12:55:20 +0000 |
commit | 2e437e1569009d8e8ed3ed896d751994e2e85d74 (patch) | |
tree | 28459e48638bfcd3a7e565c37165b3fab714d484 | |
parent | c94c9b5333c06c03deb6a6dcb1a91ecdf111b481 (diff) | |
download | qpid-python-2e437e1569009d8e8ed3ed896d751994e2e85d74.tar.gz |
QPID-3858: WIP: Created many async operation classes for each op instead of a single class with op codes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1363759 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncOperation.cpp | 363 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncOperation.h | 208 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 70 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/OperationQueue.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/OperationQueue.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/AsyncResultQueueImpl.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/AsyncStore.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnAsyncContext.cpp | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnAsyncContext.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnBuffer.cpp | 57 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnBuffer.h | 3 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp | 14 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h | 4 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp | 16 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h | 7 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp | 133 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h | 16 |
17 files changed, 613 insertions, 328 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp index b8fdb8b140..a22f803fcd 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp +++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp @@ -23,101 +23,320 @@ #include "AsyncOperation.h" -#include "qpid/Exception.h" +//#include "qpid/Exception.h" +#include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/AsyncResultHandleImpl.h" -#include <sstream> +//#include <sstream> namespace qpid { namespace asyncStore { -AsyncOperation::AsyncOperation() : - m_op(NONE), - m_targetHandle(), - m_dataSrc(0), - m_txnHandle(0) +AsyncOperation::AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + m_brokerCtxt(brokerCtxt) {} -AsyncOperation::AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - m_op(op), - m_targetHandle(th), - m_dataSrc(0), - m_txnHandle(0), - m_brokerCtxt(brokerCtxt) +AsyncOperation::~AsyncOperation() {} -AsyncOperation::AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - m_op(op), - m_targetHandle(th), - m_dataSrc(dataSrc), - m_txnHandle(0), - m_brokerCtxt(brokerCtxt) +boost::shared_ptr<qpid::broker::BrokerAsyncContext> AsyncOperation::getBrokerContext() const +{ + return m_brokerCtxt; +} + +void +AsyncOperation::submitResult() +{ + return submitResult(0, ""); +} + +void +AsyncOperation::submitResult(const int errNo, + const std::string& errMsg) +{ + if (m_brokerCtxt.get()) { + qpid::broker::AsyncResultQueue* const arq = m_brokerCtxt->getAsyncResultQueue(); + if (arq) { + qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(errNo, errMsg, m_brokerCtxt); + boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi)); + arq->submit(arh); + } + } +} + + +// --- class AsyncOpTxnPrepare --- + +AsyncOpTxnPrepare::AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_txnHandle(txnHandle) {} -AsyncOperation::AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::TxnHandle* txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - m_op(op), - m_targetHandle(th), - m_dataSrc(0), - m_txnHandle(txnHandle), - m_brokerCtxt(brokerCtxt) +AsyncOpTxnPrepare::~AsyncOpTxnPrepare() {} + +void +AsyncOpTxnPrepare::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpTxnPrepare::getOpStr() const { + return "TXN_PREPARE"; +} + + + +// --- class AsyncOpTxnCommit --- + +AsyncOpTxnCommit::AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_txnHandle(txnHandle) {} -AsyncOperation::AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::DataSource* const dataSrc, - const qpid::broker::TxnHandle* txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - m_op(op), - m_targetHandle(th), - m_dataSrc(dataSrc), - m_txnHandle(txnHandle), - m_brokerCtxt(brokerCtxt) +AsyncOpTxnCommit::~AsyncOpTxnCommit() {} + +void +AsyncOpTxnCommit::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpTxnCommit::getOpStr() const { + return "TXN_COMMIT"; +} + + +// --- class AsyncOpTxnAbort --- + +AsyncOpTxnAbort::AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_txnHandle(txnHandle) {} -AsyncOperation::~AsyncOperation() +AsyncOpTxnAbort::~AsyncOpTxnAbort() {} + +void +AsyncOpTxnAbort::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpTxnAbort::getOpStr() const { + return "TXN_ABORT"; +} + + +// --- class AsyncOpConfigCreate --- + +AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle, + const qpid::broker::DataSource* const data, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_cfgHandle(cfgHandle), + m_data(data) {} +AsyncOpConfigCreate::~AsyncOpConfigCreate() {} + +void +AsyncOpConfigCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + const char* -AsyncOperation::getOpStr() const -{ - return getOpStr(m_op); +AsyncOpConfigCreate::getOpStr() const { + return "CONFIG_CREATE"; } -boost::shared_ptr<qpid::broker::BrokerAsyncContext> -AsyncOperation::getBrokerContext() const -{ - return m_brokerCtxt; + +// --- class AsyncOpConfigDestroy --- + +AsyncOpConfigDestroy::AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_cfgHandle(cfgHandle) +{} + +AsyncOpConfigDestroy::~AsyncOpConfigDestroy() {} + +void +AsyncOpConfigDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); } -//static const char* -AsyncOperation::getOpStr(const opCode op) -{ - switch (op) { - case NONE: return "<none>"; - case TXN_PREPARE: return "TXN_PREPARE"; - case TXN_COMMIT: return "TXN_COMMIT"; - case TXN_ABORT: return "TXN_ABORT"; - case CONFIG_CREATE: return "CONFIG_CREATE"; - case CONFIG_DESTROY: return "CONFIG_DESTROY"; - case QUEUE_CREATE: return "QUEUE_CREATE"; - case QUEUE_FLUSH: return "QUEUE_FLUSH"; - case QUEUE_DESTROY: return "QUEUE_DESTROY"; - case EVENT_CREATE: return "EVENT_CREATE"; - case EVENT_DESTROY: return "EVENT_DESTROY"; - case MSG_ENQUEUE: return "MSG_ENQUEUE"; - case MSG_DEQUEUE: return "MSG_DEQUEUE"; - } - std::ostringstream oss; - oss << "AsyncStore: AsyncOperation::getOpStr(): Unknown op-code \"" << op << "\""; - throw qpid::Exception(oss.str()); +AsyncOpConfigDestroy::getOpStr() const { + return "CONFIG_DESTROY"; +} + + +// --- class AsyncOpQueueCreate --- + +AsyncOpQueueCreate::AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle, + const qpid::broker::DataSource* const data, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_queueHandle(queueHandle), + m_data(data) +{} + +AsyncOpQueueCreate::~AsyncOpQueueCreate() {} + +void +AsyncOpQueueCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpQueueCreate::getOpStr() const { + return "QUEUE_CREATE"; +} + + +// --- class AsyncOpQueueFlush --- + +AsyncOpQueueFlush::AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_queueHandle(queueHandle) +{} + +AsyncOpQueueFlush::~AsyncOpQueueFlush() {} + +void +AsyncOpQueueFlush::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpQueueFlush::getOpStr() const { + return "QUEUE_FLUSH"; +} + + +// --- class AsyncOpQueueDestroy --- + +AsyncOpQueueDestroy::AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_queueHandle(queueHandle) +{} + +AsyncOpQueueDestroy::~AsyncOpQueueDestroy() {} + +void +AsyncOpQueueDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpQueueDestroy::getOpStr() const { + return "QUEUE_DESTROY"; +} + + +// --- class AsyncOpEventCreate --- + +AsyncOpEventCreate::AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle, + const qpid::broker::DataSource* const data, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_evtHandle(evtHandle), + m_data(data), + m_txnHandle(txnHandle) +{} + +AsyncOpEventCreate::~AsyncOpEventCreate() {} + +void +AsyncOpEventCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpEventCreate::getOpStr() const { + return "EVENT_CREATE"; +} + + +// --- class AsyncOpEventDestroy --- + +AsyncOpEventDestroy::AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_evtHandle(evtHandle), + m_txnHandle(txnHandle) +{} + +AsyncOpEventDestroy::~AsyncOpEventDestroy() {} + +void +AsyncOpEventDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* +AsyncOpEventDestroy::getOpStr() const { + return "EVENT_DESTROY"; +} + + +// --- class AsyncOpMsgEnqueue --- + +AsyncOpMsgEnqueue::AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_enqHandle(enqHandle), + m_txnHandle(txnHandle) +{} + +AsyncOpMsgEnqueue::~AsyncOpMsgEnqueue() {} + +void AsyncOpMsgEnqueue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* AsyncOpMsgEnqueue::getOpStr() const { + return "MSG_ENQUEUE"; +} + + +// --- class AsyncOpMsgDequeue --- + +AsyncOpMsgDequeue::AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : + AsyncOperation(brokerCtxt), + m_enqHandle(enqHandle), + m_txnHandle(txnHandle) +{} + +AsyncOpMsgDequeue::~AsyncOpMsgDequeue() {} + +void AsyncOpMsgDequeue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { + // TODO: Implement store operation here + submitResult(); +} + +const char* AsyncOpMsgDequeue::getOpStr() const { + return "MSG_DEQUEUE"; } }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h index cb73ad639b..2b195d7443 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.h +++ b/cpp/src/qpid/asyncStore/AsyncOperation.h @@ -26,57 +26,185 @@ #include "qpid/broker/AsyncStore.h" +#include <boost/shared_ptr.hpp> + namespace qpid { namespace asyncStore { -class AsyncStoreHandle; +class AsyncStoreImpl; class AsyncOperation { public: - typedef enum {NONE=0, - TXN_PREPARE, - TXN_COMMIT, - TXN_ABORT, - CONFIG_CREATE, - CONFIG_DESTROY, - QUEUE_CREATE, - QUEUE_FLUSH, - QUEUE_DESTROY, - EVENT_CREATE, - EVENT_DESTROY, - MSG_ENQUEUE, - MSG_DEQUEUE - } opCode; - - AsyncOperation(); - AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::TxnHandle* txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); - AsyncOperation(const opCode op, - const AsyncStoreHandle* th, - const qpid::broker::DataSource* const dataSrc, - const qpid::broker::TxnHandle* txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); virtual ~AsyncOperation(); - const char* getOpStr() const; - static const char* getOpStr(const opCode op); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store) = 0; boost::shared_ptr<qpid::broker::BrokerAsyncContext> getBrokerContext() const; - + virtual const char* getOpStr() const = 0; +protected: + void submitResult(); + void submitResult(const int errNo, + const std::string& errMsg); private: - opCode m_op; - const AsyncStoreHandle* m_targetHandle; - const qpid::broker::DataSource* const m_dataSrc; - const qpid::broker::TxnHandle* m_txnHandle; boost::shared_ptr<qpid::broker::BrokerAsyncContext> const m_brokerCtxt; }; + +class AsyncOpTxnPrepare: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpTxnPrepare(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpTxnCommit: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpTxnCommit(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpTxnAbort: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpTxnAbort(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpConfigCreate: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle, + const qpid::broker::DataSource* const data, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpConfigCreate(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::ConfigHandle& m_cfgHandle; + const qpid::broker::DataSource* const m_data; +}; + + +class AsyncOpConfigDestroy: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpConfigDestroy(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::ConfigHandle& m_cfgHandle; +}; + + +class AsyncOpQueueCreate: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle, + const qpid::broker::DataSource* const data, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpQueueCreate(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::QueueHandle& m_queueHandle; + const qpid::broker::DataSource* const m_data; +}; + + +class AsyncOpQueueFlush: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpQueueFlush(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::QueueHandle& m_queueHandle; +}; + + +class AsyncOpQueueDestroy: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpQueueDestroy(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::QueueHandle& m_queueHandle; +}; + + +class AsyncOpEventCreate: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle, + const qpid::broker::DataSource* const data, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpEventCreate(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::EventHandle& m_evtHandle; + const qpid::broker::DataSource* const m_data; + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpEventDestroy: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpEventDestroy(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::EventHandle& m_evtHandle; + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpMsgEnqueue: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpMsgEnqueue(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::EnqueueHandle& m_enqHandle; + qpid::broker::TxnHandle& m_txnHandle; +}; + + +class AsyncOpMsgDequeue: public qpid::asyncStore::AsyncOperation { +public: + AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle, + qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + virtual ~AsyncOpMsgDequeue(); + virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual const char* getOpStr() const; +private: + qpid::broker::EnqueueHandle& m_enqHandle; + qpid::broker::TxnHandle& m_txnHandle; +}; + }} // namespace qpid::asyncStore #endif // qpid_asyncStore_AsyncOperation_h_ diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 6b5c3ac582..e8379b95e2 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -23,6 +23,7 @@ #include "AsyncStoreImpl.h" +#include "AsyncOperation.h" #include "ConfigHandleImpl.h" #include "EnqueueHandleImpl.h" #include "EventHandleImpl.h" @@ -37,6 +38,8 @@ #include "qpid/broker/QueueHandle.h" #include "qpid/broker/TxnHandle.h" +//#include <boost/make_shared.hpp> + namespace qpid { namespace asyncStore { @@ -94,9 +97,8 @@ void AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE, - dynamic_cast<AsyncStoreHandle*>(&txnHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -104,9 +106,8 @@ void AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT, - dynamic_cast<AsyncStoreHandle*>(&txnHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -114,9 +115,8 @@ void AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT, - dynamic_cast<AsyncStoreHandle*>(&txnHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -161,10 +161,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE, - dynamic_cast<AsyncStoreHandle*>(&cfgHandle), - dataSrc, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -172,9 +170,8 @@ void AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY, - dynamic_cast<AsyncStoreHandle*>(&cfgHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -183,10 +180,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE, - dynamic_cast<AsyncStoreHandle*>(&queueHandle), - dataSrc, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -194,9 +189,8 @@ void AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY, - dynamic_cast<AsyncStoreHandle*>(&queueHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -204,9 +198,8 @@ void AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH, - dynamic_cast<AsyncStoreHandle*>(&queueHandle), - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -216,11 +209,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE, - dynamic_cast<AsyncStoreHandle*>(&eventHandle), - dataSrc, - &txnHandle, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -229,10 +219,8 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY, - dynamic_cast<AsyncStoreHandle*>(&eventHandle), - &txnHandle, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -241,10 +229,8 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE, - dynamic_cast<AsyncStoreHandle*>(&enqHandle), - &txnHandle, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -253,10 +239,8 @@ AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE, - dynamic_cast<AsyncStoreHandle*>(&enqHandle), - &txnHandle, - brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, brokerCtxt)); + brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index dd4e7c1343..b7b8970c39 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -54,7 +54,7 @@ OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) try { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext(); - if (bc) { + if (bc.get()) { qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue(); if (arq) { qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc); diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h index 143ac2ab0c..473b5f1cfb 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.h +++ b/cpp/src/qpid/asyncStore/OperationQueue.h @@ -26,7 +26,6 @@ #include "AsyncOperation.h" -//#include "qpid/broker/AsyncStore.h" #include "qpid/sys/PollableQueue.h" namespace qpid { @@ -43,6 +42,7 @@ private: typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncOperation> > OpQueue; OpQueue m_opQueue; + // Callback function for pollable queue, defined in qpid::sys::PollableQueue OpQueue::Batch::const_iterator handle(const OpQueue::Batch& e); }; diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.h b/cpp/src/qpid/broker/AsyncResultQueueImpl.h index 01fff6a53a..805057400f 100644 --- a/cpp/src/qpid/broker/AsyncResultQueueImpl.h +++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.h @@ -44,6 +44,7 @@ private: typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncResultHandle> > ResultQueue; ResultQueue m_resQueue; + // Callback function for pollable queue, defined in qpid::sys::PollableQueue ResultQueue::Batch::const_iterator handle(const ResultQueue::Batch& e); }; diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 7bb6175862..1c8f4b0737 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -46,6 +46,10 @@ public: virtual ~BrokerAsyncContext() {} virtual AsyncResultQueue* getAsyncResultQueue() const = 0; virtual void invokeCallback(const AsyncResultHandle* const) const = 0; + void setOpStr(const char* opStr) { m_opStr = opStr; } + const char* getOpStr() const { return m_opStr; } +private: + const char* m_opStr; }; class DataSource { @@ -83,6 +87,7 @@ public: boost::shared_ptr<BrokerAsyncContext>) = 0; virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + void testOp() const {} }; // Subclassed by store: diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp index c3d4342993..63e2de2b41 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.cpp +++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp @@ -27,13 +27,9 @@ namespace qpid { namespace broker { TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, - TxnHandle& th, - const qpid::asyncStore::AsyncOperation::opCode op, - qpid::broker::AsyncResultCallback rcb, - qpid::broker::AsyncResultQueue* const arq): + AsyncResultCallback rcb, + AsyncResultQueue* const arq): m_tb(tb), - m_th(th), - m_op(op), m_rcb(rcb), m_arq(arq) {} @@ -47,24 +43,6 @@ TxnAsyncContext::getTxnBuffer() const return m_tb; } -qpid::asyncStore::AsyncOperation::opCode -TxnAsyncContext::getOpCode() const -{ - return m_op; -} - -const char* -TxnAsyncContext::getOpStr() const -{ - return qpid::asyncStore::AsyncOperation::getOpStr(m_op); -} - -TxnHandle& -TxnAsyncContext::getTransactionContext() const -{ - return m_th; -} - AsyncResultQueue* TxnAsyncContext::getAsyncResultQueue() const { diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h index 810c46429c..0c35b110a8 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.h +++ b/cpp/src/qpid/broker/TxnAsyncContext.h @@ -29,6 +29,9 @@ #include "qpid/asyncStore/AsyncOperation.h" namespace qpid { +namespace asyncStore { +class AsyncOperation; +} namespace broker { class TxnHandle; @@ -39,15 +42,10 @@ class TxnAsyncContext: public BrokerAsyncContext { public: TxnAsyncContext(TxnBuffer* const tb, - TxnHandle& th, - const qpid::asyncStore::AsyncOperation::opCode op, - qpid::broker::AsyncResultCallback rcb, - qpid::broker::AsyncResultQueue* const arq); + AsyncResultCallback rcb, + AsyncResultQueue* const arq); virtual ~TxnAsyncContext(); TxnBuffer* getTxnBuffer() const; - qpid::asyncStore::AsyncOperation::opCode getOpCode() const; - const char* getOpStr() const; - TxnHandle& getTransactionContext() const; // --- Interface BrokerAsyncContext --- AsyncResultQueue* getAsyncResultQueue() const; @@ -55,8 +53,6 @@ public: private: TxnBuffer* const m_tb; - TxnHandle& m_th; - const qpid::asyncStore::AsyncOperation::opCode m_op; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; }; diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp index 425d725e9e..ba00293452 100644 --- a/cpp/src/qpid/broker/TxnBuffer.cpp +++ b/cpp/src/qpid/broker/TxnBuffer.cpp @@ -96,26 +96,6 @@ TxnBuffer::commitLocal(AsyncTransaction* const store) return false; } -// static -void -TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh) -{ - if (arh) { - boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); - if (arh->getErrNo()) { - QPID_LOG(error, "TxnBuffer::handleAsyncResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() - << " (" << arh->getErrMsg() << ")"); - tac->getTxnBuffer()->asyncLocalAbort(); - } else { - if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) { - tac->getTxnBuffer()->asyncLocalAbort(); - } else { - tac->getTxnBuffer()->asyncLocalCommit(); - } - } - } -} - void TxnBuffer::asyncLocalCommit() { @@ -130,10 +110,9 @@ TxnBuffer::asyncLocalCommit() m_state = COMMIT; { boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, - m_txnHandle, - qpid::asyncStore::AsyncOperation::TXN_COMMIT, - &handleAsyncResult, + &handleAsyncCommitResult, &m_resultQueue)); + m_store->testOp(); m_store->submitCommit(m_txnHandle, tac); } break; @@ -147,6 +126,21 @@ TxnBuffer::asyncLocalCommit() } } +//static +void +TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + QPID_LOG(error, "TxnBuffer::handleAsyncCommitResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() + << " (" << arh->getErrMsg() << ")"); + tac->getTxnBuffer()->asyncLocalAbort(); + } else { + tac->getTxnBuffer()->asyncLocalCommit(); + } + } +} + void TxnBuffer::asyncLocalAbort() { @@ -158,9 +152,7 @@ TxnBuffer::asyncLocalAbort() m_state = ROLLBACK; { boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, - m_txnHandle, - qpid::asyncStore::AsyncOperation::TXN_ABORT, - &handleAsyncResult, + &handleAsyncAbortResult, &m_resultQueue)); m_store->submitCommit(m_txnHandle, tac); } @@ -173,4 +165,17 @@ TxnBuffer::asyncLocalAbort() } } +//static +void +TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() + << " (" << arh->getErrMsg() << ")"); + } + tac->getTxnBuffer()->asyncLocalAbort(); + } +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h index cd78846b71..bd9743cad7 100644 --- a/cpp/src/qpid/broker/TxnBuffer.h +++ b/cpp/src/qpid/broker/TxnBuffer.h @@ -51,9 +51,10 @@ public: bool commitLocal(AsyncTransaction* const store); // --- Async operations --- - static void handleAsyncResult(const AsyncResultHandle* const arh); void asyncLocalCommit(); + static void handleAsyncCommitResult(const AsyncResultHandle* const arh); void asyncLocalAbort(); + static void handleAsyncAbortResult(const AsyncResultHandle* const arh); private: std::vector<boost::shared_ptr<TxnOp> > m_ops; diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp index 5bcf3fe401..e3bfe9ae7a 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp @@ -32,10 +32,8 @@ namespace storePerftools { namespace asyncPerf { MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, - const qpid::asyncStore::AsyncOperation::opCode op, boost::shared_ptr<SimpleQueue> q) : m_msg(msg), - m_op(op), m_q(q) { assert(m_msg.get() != 0); @@ -45,18 +43,6 @@ MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg MessageAsyncContext::~MessageAsyncContext() {} -qpid::asyncStore::AsyncOperation::opCode -MessageAsyncContext::getOpCode() const -{ - return m_op; -} - -const char* -MessageAsyncContext::getOpStr() const -{ - return qpid::asyncStore::AsyncOperation::getOpStr(m_op); -} - boost::intrusive_ptr<SimpleMessage> MessageAsyncContext::getMessage() const { diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h index 77d7be286b..9252fbda45 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h @@ -40,18 +40,14 @@ class MessageAsyncContext : public qpid::broker::BrokerAsyncContext { public: MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, - const qpid::asyncStore::AsyncOperation::opCode op, boost::shared_ptr<SimpleQueue> q); virtual ~MessageAsyncContext(); - qpid::asyncStore::AsyncOperation::opCode getOpCode() const; - const char* getOpStr() const; boost::intrusive_ptr<SimpleMessage> getMessage() const; boost::shared_ptr<SimpleQueue> getQueue() const; void destroy(); private: boost::intrusive_ptr<SimpleMessage> m_msg; - const qpid::asyncStore::AsyncOperation::opCode m_op; boost::shared_ptr<SimpleQueue> m_q; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp index f2eea9bad3..0312f61d3c 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp @@ -33,12 +33,10 @@ namespace asyncPerf { QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, qpid::broker::TxnHandle& th, - const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq) : m_q(q), m_th(th), - m_op(op), m_rcb(rcb), m_arq(arq) { @@ -48,13 +46,11 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, boost::intrusive_ptr<SimpleMessage> msg, qpid::broker::TxnHandle& th, - const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq) : m_q(q), m_msg(msg), m_th(th), - m_op(op), m_rcb(rcb), m_arq(arq) { @@ -65,18 +61,6 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, QueueAsyncContext::~QueueAsyncContext() {} -qpid::asyncStore::AsyncOperation::opCode -QueueAsyncContext::getOpCode() const -{ - return m_op; -} - -const char* -QueueAsyncContext::getOpStr() const -{ - return qpid::asyncStore::AsyncOperation::getOpStr(m_op); -} - boost::shared_ptr<SimpleQueue> QueueAsyncContext::getQueue() const { diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h index e3e87b8ad8..4e3d9fe2db 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h @@ -49,18 +49,14 @@ class QueueAsyncContext: public qpid::broker::BrokerAsyncContext public: QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, qpid::broker::TxnHandle& th, - const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, boost::intrusive_ptr<SimpleMessage> msg, qpid::broker::TxnHandle& th, - const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq); virtual ~QueueAsyncContext(); - qpid::asyncStore::AsyncOperation::opCode getOpCode() const; - const char* getOpStr() const; boost::shared_ptr<SimpleQueue> getQueue() const; boost::intrusive_ptr<SimpleMessage> getMessage() const; qpid::broker::TxnHandle getTxnHandle() const; @@ -72,8 +68,7 @@ public: private: boost::shared_ptr<SimpleQueue> m_q; boost::intrusive_ptr<SimpleMessage> m_msg; - qpid::broker::TxnHandle m_th; - const qpid::asyncStore::AsyncOperation::opCode m_op; + qpid::broker::TxnHandle m_th; // TODO: get rid of this qpid::broker::AsyncResultCallback m_rcb; qpid::broker::AsyncResultQueue* const m_arq; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index 2a6f2b208b..79b8b46919 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -33,7 +33,6 @@ #include "qpid/broker/AsyncResultHandle.h" #include "qpid/broker/TxnHandle.h" -#include <boost/make_shared.hpp> #include <string.h> // memcpy() namespace tests { @@ -69,43 +68,6 @@ SimpleQueue::SimpleQueue(const std::string& name, SimpleQueue::~SimpleQueue() {} -// static -void -SimpleQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh) -{ - if (arh) { - boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); - if (arh->getErrNo()) { - // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; - } else { - // Handle async success here - switch(qc->getOpCode()) { - case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: - qc->getQueue()->createComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH: - qc->getQueue()->flushComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: - qc->getQueue()->destroyComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: - qc->getQueue()->enqueueComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: - qc->getQueue()->dequeueComplete(qc); - break; - default: - std::ostringstream oss; - oss << "tests::storePerftools::asyncPerf::SimpleQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode(); - throw qpid::Exception(oss.str()); - }; - } - } -} - const qpid::broker::QueueHandle& SimpleQueue::getHandle() const { @@ -130,16 +92,28 @@ SimpleQueue::asyncCreate() if (m_store) { boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), s_nullTxnHandle, - qpid::asyncStore::AsyncOperation::QUEUE_CREATE, - &handleAsyncResult, + &handleAsyncCreateResult, &m_resultQueue)); - m_store->submitCreate(m_queueHandle, - this, - qac); + m_store->submitCreate(m_queueHandle, this, qac); ++m_asyncOpCounter; } } +//static +void +SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + qc->getQueue()->createComplete(qc); + } + } +} + void SimpleQueue::asyncDestroy(const bool deleteQueue) { @@ -148,25 +122,38 @@ SimpleQueue::asyncDestroy(const bool deleteQueue) if (deleteQueue) { boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), s_nullTxnHandle, - qpid::asyncStore::AsyncOperation::QUEUE_DESTROY, - &handleAsyncResult, + &handleAsyncDestroyResult, &m_resultQueue)); - m_store->submitDestroy(m_queueHandle, - qac); + m_store->submitDestroy(m_queueHandle, qac); ++m_asyncOpCounter; } m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); } } +//static +void +SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + qc->getQueue()->destroyComplete(qc); + } + } +} + void SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) { boost::shared_ptr<QueuedMessage> qm; if (msg->isPersistent() && m_store) { - qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); + qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); } else { - qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg)); + qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)); } enqueue(s_nullTxnHandle, qm); push(qm); @@ -231,9 +218,9 @@ SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) { boost::shared_ptr<QueuedMessage> qm; if (msg->isPersistent() && m_store) { - qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); + qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); } else { - qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg)); + qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)); } push(qm); } @@ -357,9 +344,6 @@ void SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm, bool /*isRecovery*/) { -boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); -assert(pqm.get()); - m_messages->push(qm); } @@ -375,8 +359,7 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), pqm->payload(), th, - qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, - &handleAsyncResult, + &handleAsyncEnqueueResult, &m_resultQueue)); // TODO : This must be done from inside store, not here if (th.isValid()) { @@ -389,6 +372,21 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, return true; } +// private static +void +SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + qc->getQueue()->enqueueComplete(qc); + } + } +} + // private bool SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, @@ -398,8 +396,7 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), pqm->payload(), th, - qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, - &handleAsyncResult, + &handleAsyncDequeueResult, &m_resultQueue)); // TODO : This must be done from inside store, not here if (th.isValid()) { @@ -411,6 +408,20 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, ++m_asyncOpCounter; return true; } +// private static +void +SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + qc->getQueue()->dequeueComplete(qc); + } + } +} // private void @@ -455,9 +466,8 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) assert(qc->getQueue().get() == this); --m_asyncOpCounter; - qpid::broker::TxnHandle th = qc->getTxnHandle(); - // TODO : This must be done from inside store, not here + qpid::broker::TxnHandle th = qc->getTxnHandle(); if (th.isValid()) { // transactional enqueue th.decrOpCnt(); } @@ -470,9 +480,8 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) assert(qc->getQueue().get() == this); --m_asyncOpCounter; - qpid::broker::TxnHandle th = qc->getTxnHandle(); - // TODO : This must be done from inside store, not here + qpid::broker::TxnHandle th = qc->getTxnHandle(); if (th.isValid()) { // transactional enqueue th.decrOpCnt(); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h index 2763ae3159..f13febbafa 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h @@ -35,9 +35,6 @@ #include <boost/enable_shared_from_this.hpp> namespace qpid { -namespace asyncStore { -//class AsyncStoreImpl; -} namespace broker { class AsyncResultQueue; } @@ -67,13 +64,14 @@ public: qpid::broker::AsyncResultQueue& arq); virtual ~SimpleQueue(); - static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res); const qpid::broker::QueueHandle& getHandle() const; qpid::broker::QueueHandle& getHandle(); qpid::broker::AsyncStore* getStore(); void asyncCreate(); + static void handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh); void asyncDestroy(const bool deleteQueue); + static void handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh); // --- Methods in msg handling path from qpid::Queue --- void deliver(boost::intrusive_ptr<SimpleMessage> msg); @@ -98,7 +96,7 @@ public: virtual const std::string& getName() const; virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst); - // --- Interface DataStore --- + // --- Interface qpid::broker::DataStore --- virtual uint64_t getSize(); virtual void write(char* target); @@ -116,8 +114,7 @@ private: bool m_destroyed; // --- Members & methods in msg handling path copied from qpid::Queue --- - struct UsageBarrier - { + struct UsageBarrier { SimpleQueue& m_parent; uint32_t m_count; qpid::sys::Monitor m_monitor; @@ -126,8 +123,7 @@ private: void release(); void destroy(); }; - struct ScopedUse - { + struct ScopedUse { UsageBarrier& m_barrier; const bool m_acquired; ScopedUse(UsageBarrier& b); @@ -141,8 +137,10 @@ private: // -- Async ops --- bool asyncEnqueue(qpid::broker::TxnHandle& th, boost::shared_ptr<PersistableQueuedMessage> pqm); + static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh); bool asyncDequeue(qpid::broker::TxnHandle& th, boost::shared_ptr<PersistableQueuedMessage> pqm); + static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh); // --- Async op counter --- void destroyCheck(const std::string& opDescr) const; |