diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncOperation.cpp | 94 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncOperation.h | 71 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/OperationQueue.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/AsyncStore.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleTxnBuffer.cpp | 1 |
6 files changed, 110 insertions, 91 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp index 3a53b4df1b..1a67a15d6f 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp +++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp @@ -31,8 +31,10 @@ namespace qpid { namespace asyncStore { -AsyncOperation::AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - m_brokerCtxt(brokerCtxt) +AsyncOperation::AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store) : + m_brokerCtxt(brokerCtxt), + m_store(store) {} AsyncOperation::~AsyncOperation() {} @@ -42,13 +44,13 @@ boost::shared_ptr<qpid::broker::BrokerAsyncContext> AsyncOperation::getBrokerCon } void -AsyncOperation::submitResult() { +AsyncOperation::submitResult() const { return submitResult(0, ""); } void AsyncOperation::submitResult(const int errNo, - const std::string& errMsg) { + const std::string& errMsg) const { if (m_brokerCtxt.get()) { qpid::broker::AsyncResultQueue* const arq = m_brokerCtxt->getAsyncResultQueue(); if (arq) { @@ -63,15 +65,16 @@ AsyncOperation::submitResult(const int errNo, // --- class AsyncOpTxnPrepare --- AsyncOpTxnPrepare::AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt) : - AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt)), + boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt), store), m_txnHandle(txnHandle) {} AsyncOpTxnPrepare::~AsyncOpTxnPrepare() {} void -AsyncOpTxnPrepare::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { +AsyncOpTxnPrepare::executeOp() const { // TODO: Implement store operation here submitResult(); } @@ -86,15 +89,16 @@ AsyncOpTxnPrepare::getOpStr() const { // --- class AsyncOpTxnCommit --- AsyncOpTxnCommit::AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) : - AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt)), + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt), store), m_txnHandle(txnHandle) {} AsyncOpTxnCommit::~AsyncOpTxnCommit() {} void -AsyncOpTxnCommit::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { +AsyncOpTxnCommit::executeOp() const { // TODO: Implement store operation here submitResult(); } @@ -108,15 +112,16 @@ AsyncOpTxnCommit::getOpStr() const { // --- class AsyncOpTxnAbort --- AsyncOpTxnAbort::AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) : - AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt)), + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt), store), m_txnHandle(txnHandle) {} AsyncOpTxnAbort::~AsyncOpTxnAbort() {} void -AsyncOpTxnAbort::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { +AsyncOpTxnAbort::executeOp() const { // TODO: Implement store operation here submitResult(); } @@ -131,8 +136,9 @@ AsyncOpTxnAbort::getOpStr() const { AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle, const qpid::broker::DataSource* const data, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - AsyncOperation(brokerCtxt), + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(brokerCtxt, store), m_cfgHandle(cfgHandle), m_data(data) {} @@ -140,7 +146,7 @@ AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle, AsyncOpConfigCreate::~AsyncOpConfigCreate() {} void -AsyncOpConfigCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { +AsyncOpConfigCreate::executeOp() const { // TODO: Implement store operation here submitResult(); } @@ -154,15 +160,16 @@ AsyncOpConfigCreate::getOpStr() const { // --- class AsyncOpConfigDestroy --- AsyncOpConfigDestroy::AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - AsyncOperation(brokerCtxt), + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(brokerCtxt, store), m_cfgHandle(cfgHandle) {} AsyncOpConfigDestroy::~AsyncOpConfigDestroy() {} void -AsyncOpConfigDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { +AsyncOpConfigDestroy::executeOp() const { // TODO: Implement store operation here submitResult(); } @@ -177,8 +184,9 @@ AsyncOpConfigDestroy::getOpStr() const { AsyncOpQueueCreate::AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle, const qpid::broker::DataSource* const data, - boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) : - AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt)), + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt), store), m_queueHandle(queueHandle), m_data(data) {} @@ -186,7 +194,7 @@ AsyncOpQueueCreate::AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle, AsyncOpQueueCreate::~AsyncOpQueueCreate() {} void -AsyncOpQueueCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { +AsyncOpQueueCreate::executeOp() const { // TODO: Implement store operation here submitResult(); } @@ -200,15 +208,16 @@ AsyncOpQueueCreate::getOpStr() const { // --- class AsyncOpQueueFlush --- AsyncOpQueueFlush::AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) : - AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt)), + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt), store), m_queueHandle(queueHandle) {} AsyncOpQueueFlush::~AsyncOpQueueFlush() {} void -AsyncOpQueueFlush::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { +AsyncOpQueueFlush::executeOp() const { // TODO: Implement store operation here submitResult(); } @@ -222,15 +231,16 @@ AsyncOpQueueFlush::getOpStr() const { // --- class AsyncOpQueueDestroy --- AsyncOpQueueDestroy::AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) : - AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt)), + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt), store), m_queueHandle(queueHandle) {} AsyncOpQueueDestroy::~AsyncOpQueueDestroy() {} void -AsyncOpQueueDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { +AsyncOpQueueDestroy::executeOp() const { // TODO: Implement store operation here submitResult(); } @@ -246,8 +256,9 @@ AsyncOpQueueDestroy::getOpStr() const { AsyncOpEventCreate::AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle, const qpid::broker::DataSource* const data, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - AsyncOperation(brokerCtxt), + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(brokerCtxt, store), m_evtHandle(evtHandle), m_data(data), m_txnHandle(txnHandle) @@ -256,7 +267,7 @@ AsyncOpEventCreate::AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle, AsyncOpEventCreate::~AsyncOpEventCreate() {} void -AsyncOpEventCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { +AsyncOpEventCreate::executeOp() const { // TODO: Implement store operation here submitResult(); } @@ -271,8 +282,9 @@ AsyncOpEventCreate::getOpStr() const { AsyncOpEventDestroy::AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - AsyncOperation(brokerCtxt), + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(brokerCtxt, store), m_evtHandle(evtHandle), m_txnHandle(txnHandle) {} @@ -280,7 +292,7 @@ AsyncOpEventDestroy::AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle, AsyncOpEventDestroy::~AsyncOpEventDestroy() {} void -AsyncOpEventDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { +AsyncOpEventDestroy::executeOp() const { // TODO: Implement store operation here submitResult(); } @@ -295,15 +307,16 @@ AsyncOpEventDestroy::getOpStr() const { AsyncOpMsgEnqueue::AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - AsyncOperation(brokerCtxt), + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(brokerCtxt, store), m_enqHandle(enqHandle), m_txnHandle(txnHandle) {} AsyncOpMsgEnqueue::~AsyncOpMsgEnqueue() {} -void AsyncOpMsgEnqueue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { +void AsyncOpMsgEnqueue::executeOp() const { // TODO: Implement store operation here submitResult(); } @@ -317,15 +330,16 @@ const char* AsyncOpMsgEnqueue::getOpStr() const { AsyncOpMsgDequeue::AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : - AsyncOperation(brokerCtxt), + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store) : + AsyncOperation(brokerCtxt, store), m_enqHandle(enqHandle), m_txnHandle(txnHandle) {} AsyncOpMsgDequeue::~AsyncOpMsgDequeue() {} -void AsyncOpMsgDequeue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) { +void AsyncOpMsgDequeue::executeOp() const { // TODO: Implement store operation here submitResult(); } diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h index 2894816ca4..f22cccad07 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.h +++ b/cpp/src/qpid/asyncStore/AsyncOperation.h @@ -34,26 +34,30 @@ class AsyncStoreImpl; class AsyncOperation { public: - AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store); virtual ~AsyncOperation(); - virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store) = 0; + virtual void executeOp() const = 0; boost::shared_ptr<qpid::broker::BrokerAsyncContext> getBrokerContext() const; virtual const char* getOpStr() const = 0; protected: - void submitResult(); + void submitResult() const; void submitResult(const int errNo, - const std::string& errMsg); + const std::string& errMsg) const; private: boost::shared_ptr<qpid::broker::BrokerAsyncContext> const m_brokerCtxt; +protected: + qpid::broker::AsyncStore* m_store; }; class AsyncOpTxnPrepare: public qpid::asyncStore::AsyncOperation { public: AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt); + boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt, + qpid::broker::AsyncStore* store); virtual ~AsyncOpTxnPrepare(); - virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual void executeOp() const; virtual const char* getOpStr() const; private: qpid::broker::TxnHandle& m_txnHandle; @@ -63,9 +67,10 @@ private: class AsyncOpTxnCommit: public qpid::asyncStore::AsyncOperation { public: AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt); + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt, + qpid::broker::AsyncStore* store); virtual ~AsyncOpTxnCommit(); - virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual void executeOp() const; virtual const char* getOpStr() const; private: qpid::broker::TxnHandle& m_txnHandle; @@ -75,9 +80,10 @@ private: class AsyncOpTxnAbort: public qpid::asyncStore::AsyncOperation { public: AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt); + boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt, + qpid::broker::AsyncStore* store); virtual ~AsyncOpTxnAbort(); - virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual void executeOp() const; virtual const char* getOpStr() const; private: qpid::broker::TxnHandle& m_txnHandle; @@ -88,9 +94,10 @@ class AsyncOpConfigCreate: public qpid::asyncStore::AsyncOperation { public: AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle, const qpid::broker::DataSource* const data, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store); virtual ~AsyncOpConfigCreate(); - virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual void executeOp() const; virtual const char* getOpStr() const; private: qpid::broker::ConfigHandle& m_cfgHandle; @@ -101,9 +108,10 @@ private: class AsyncOpConfigDestroy: public qpid::asyncStore::AsyncOperation { public: AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store); virtual ~AsyncOpConfigDestroy(); - virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual void executeOp() const; virtual const char* getOpStr() const; private: qpid::broker::ConfigHandle& m_cfgHandle; @@ -114,9 +122,10 @@ class AsyncOpQueueCreate: public qpid::asyncStore::AsyncOperation { public: AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle, const qpid::broker::DataSource* const data, - boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt, + qpid::broker::AsyncStore* store); virtual ~AsyncOpQueueCreate(); - virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual void executeOp() const; virtual const char* getOpStr() const; private: qpid::broker::QueueHandle& m_queueHandle; @@ -127,9 +136,10 @@ private: class AsyncOpQueueFlush: public qpid::asyncStore::AsyncOperation { public: AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt, + qpid::broker::AsyncStore* store); virtual ~AsyncOpQueueFlush(); - virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual void executeOp() const; virtual const char* getOpStr() const; private: qpid::broker::QueueHandle& m_queueHandle; @@ -139,9 +149,10 @@ private: class AsyncOpQueueDestroy: public qpid::asyncStore::AsyncOperation { public: AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle, - boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt); + boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt, + qpid::broker::AsyncStore* store); virtual ~AsyncOpQueueDestroy(); - virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual void executeOp() const; virtual const char* getOpStr() const; private: qpid::broker::QueueHandle& m_queueHandle; @@ -153,9 +164,10 @@ public: AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle, const qpid::broker::DataSource* const data, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store); virtual ~AsyncOpEventCreate(); - virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual void executeOp() const; virtual const char* getOpStr() const; private: qpid::broker::EventHandle& m_evtHandle; @@ -168,9 +180,10 @@ class AsyncOpEventDestroy: public qpid::asyncStore::AsyncOperation { public: AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store); virtual ~AsyncOpEventDestroy(); - virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual void executeOp() const; virtual const char* getOpStr() const; private: qpid::broker::EventHandle& m_evtHandle; @@ -182,9 +195,10 @@ class AsyncOpMsgEnqueue: public qpid::asyncStore::AsyncOperation { public: AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store); virtual ~AsyncOpMsgEnqueue(); - virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual void executeOp() const; virtual const char* getOpStr() const; private: qpid::broker::EnqueueHandle& m_enqHandle; @@ -196,9 +210,10 @@ class AsyncOpMsgDequeue: public qpid::asyncStore::AsyncOperation { public: AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt, + qpid::broker::AsyncStore* store); virtual ~AsyncOpMsgDequeue(); - virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store); + virtual void executeOp() const; virtual const char* getOpStr() const; private: qpid::broker::EnqueueHandle& m_enqHandle; diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 5c62782278..038f4b4238 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -90,7 +90,7 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid, void AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, TxnCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, TxnCtxt, this)); TxnCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -98,7 +98,7 @@ AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, void AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, TxnCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, TxnCtxt, this)); TxnCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -106,7 +106,7 @@ AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, void AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, TxnCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, TxnCtxt, this)); TxnCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -145,7 +145,7 @@ void AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt, this)); brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -153,7 +153,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, void AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt, this)); brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -162,7 +162,7 @@ void AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, QueueCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, QueueCtxt, this)); QueueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -170,7 +170,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, void AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, QueueCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, QueueCtxt, this)); QueueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -178,7 +178,7 @@ AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, void AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, QueueCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, QueueCtxt, this)); QueueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -188,7 +188,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, const qpid::broker::DataSource* const dataSrc, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt, this)); brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -197,7 +197,7 @@ void AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt, this)); brokerCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -206,7 +206,7 @@ void AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, QueueCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, QueueCtxt, this)); QueueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } @@ -215,7 +215,7 @@ void AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) { - boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, QueueCtxt)); + boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, QueueCtxt, this)); QueueCtxt->setOpStr(op->getOpStr()); m_operations.submit(op); } diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index 09fa8c7048..51dcd7392a 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -50,15 +50,7 @@ OperationQueue::OpQueue::Batch::const_iterator OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { try { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { - boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext(); - if (bc.get()) { - qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue(); - if (arq) { - qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc); - boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi)); - arq->submit(arh); - } - } + (*i)->executeOp(); // Do store work here } } catch (const std::exception& e) { QPID_LOG(error, "qpid::asyncStore::OperationQueue: Exception thrown processing async op: " << e.what()); diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 7009565a7c..e274a4e196 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -90,7 +90,6 @@ public: boost::shared_ptr<TxnAsyncContext>) = 0; virtual void submitAbort(TxnHandle&, boost::shared_ptr<TxnAsyncContext>) = 0; - void testOp() const {} }; // Subclassed by store: diff --git a/cpp/src/qpid/broker/SimpleTxnBuffer.cpp b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp index cb9c54d2d4..7995eae874 100644 --- a/cpp/src/qpid/broker/SimpleTxnBuffer.cpp +++ b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp @@ -164,7 +164,6 @@ SimpleTxnBuffer::asyncLocalCommit() { boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, &handleAsyncCommitResult, &m_resultQueue)); - m_store->testOp(); m_store->submitCommit(m_txnHandle, tac); break; } |