summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/asyncStore
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/asyncStore')
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.cpp363
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.h208
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp70
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp2
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.h2
5 files changed, 488 insertions, 157 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
index b8fdb8b140..a22f803fcd 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
@@ -23,101 +23,320 @@
#include "AsyncOperation.h"
-#include "qpid/Exception.h"
+//#include "qpid/Exception.h"
+#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/AsyncResultHandleImpl.h"
-#include <sstream>
+//#include <sstream>
namespace qpid {
namespace asyncStore {
-AsyncOperation::AsyncOperation() :
- m_op(NONE),
- m_targetHandle(),
- m_dataSrc(0),
- m_txnHandle(0)
+AsyncOperation::AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ m_brokerCtxt(brokerCtxt)
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(0),
- m_txnHandle(0),
- m_brokerCtxt(brokerCtxt)
+AsyncOperation::~AsyncOperation()
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(dataSrc),
- m_txnHandle(0),
- m_brokerCtxt(brokerCtxt)
+boost::shared_ptr<qpid::broker::BrokerAsyncContext> AsyncOperation::getBrokerContext() const
+{
+ return m_brokerCtxt;
+}
+
+void
+AsyncOperation::submitResult()
+{
+ return submitResult(0, "");
+}
+
+void
+AsyncOperation::submitResult(const int errNo,
+ const std::string& errMsg)
+{
+ if (m_brokerCtxt.get()) {
+ qpid::broker::AsyncResultQueue* const arq = m_brokerCtxt->getAsyncResultQueue();
+ if (arq) {
+ qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(errNo, errMsg, m_brokerCtxt);
+ boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi));
+ arq->submit(arh);
+ }
+ }
+}
+
+
+// --- class AsyncOpTxnPrepare ---
+
+AsyncOpTxnPrepare::AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_txnHandle(txnHandle)
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(0),
- m_txnHandle(txnHandle),
- m_brokerCtxt(brokerCtxt)
+AsyncOpTxnPrepare::~AsyncOpTxnPrepare() {}
+
+void
+AsyncOpTxnPrepare::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpTxnPrepare::getOpStr() const {
+ return "TXN_PREPARE";
+}
+
+
+
+// --- class AsyncOpTxnCommit ---
+
+AsyncOpTxnCommit::AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_txnHandle(txnHandle)
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(dataSrc),
- m_txnHandle(txnHandle),
- m_brokerCtxt(brokerCtxt)
+AsyncOpTxnCommit::~AsyncOpTxnCommit() {}
+
+void
+AsyncOpTxnCommit::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpTxnCommit::getOpStr() const {
+ return "TXN_COMMIT";
+}
+
+
+// --- class AsyncOpTxnAbort ---
+
+AsyncOpTxnAbort::AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_txnHandle(txnHandle)
{}
-AsyncOperation::~AsyncOperation()
+AsyncOpTxnAbort::~AsyncOpTxnAbort() {}
+
+void
+AsyncOpTxnAbort::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpTxnAbort::getOpStr() const {
+ return "TXN_ABORT";
+}
+
+
+// --- class AsyncOpConfigCreate ---
+
+AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_cfgHandle(cfgHandle),
+ m_data(data)
{}
+AsyncOpConfigCreate::~AsyncOpConfigCreate() {}
+
+void
+AsyncOpConfigCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
const char*
-AsyncOperation::getOpStr() const
-{
- return getOpStr(m_op);
+AsyncOpConfigCreate::getOpStr() const {
+ return "CONFIG_CREATE";
}
-boost::shared_ptr<qpid::broker::BrokerAsyncContext>
-AsyncOperation::getBrokerContext() const
-{
- return m_brokerCtxt;
+
+// --- class AsyncOpConfigDestroy ---
+
+AsyncOpConfigDestroy::AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_cfgHandle(cfgHandle)
+{}
+
+AsyncOpConfigDestroy::~AsyncOpConfigDestroy() {}
+
+void
+AsyncOpConfigDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
}
-//static
const char*
-AsyncOperation::getOpStr(const opCode op)
-{
- switch (op) {
- case NONE: return "<none>";
- case TXN_PREPARE: return "TXN_PREPARE";
- case TXN_COMMIT: return "TXN_COMMIT";
- case TXN_ABORT: return "TXN_ABORT";
- case CONFIG_CREATE: return "CONFIG_CREATE";
- case CONFIG_DESTROY: return "CONFIG_DESTROY";
- case QUEUE_CREATE: return "QUEUE_CREATE";
- case QUEUE_FLUSH: return "QUEUE_FLUSH";
- case QUEUE_DESTROY: return "QUEUE_DESTROY";
- case EVENT_CREATE: return "EVENT_CREATE";
- case EVENT_DESTROY: return "EVENT_DESTROY";
- case MSG_ENQUEUE: return "MSG_ENQUEUE";
- case MSG_DEQUEUE: return "MSG_DEQUEUE";
- }
- std::ostringstream oss;
- oss << "AsyncStore: AsyncOperation::getOpStr(): Unknown op-code \"" << op << "\"";
- throw qpid::Exception(oss.str());
+AsyncOpConfigDestroy::getOpStr() const {
+ return "CONFIG_DESTROY";
+}
+
+
+// --- class AsyncOpQueueCreate ---
+
+AsyncOpQueueCreate::AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_queueHandle(queueHandle),
+ m_data(data)
+{}
+
+AsyncOpQueueCreate::~AsyncOpQueueCreate() {}
+
+void
+AsyncOpQueueCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpQueueCreate::getOpStr() const {
+ return "QUEUE_CREATE";
+}
+
+
+// --- class AsyncOpQueueFlush ---
+
+AsyncOpQueueFlush::AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_queueHandle(queueHandle)
+{}
+
+AsyncOpQueueFlush::~AsyncOpQueueFlush() {}
+
+void
+AsyncOpQueueFlush::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpQueueFlush::getOpStr() const {
+ return "QUEUE_FLUSH";
+}
+
+
+// --- class AsyncOpQueueDestroy ---
+
+AsyncOpQueueDestroy::AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_queueHandle(queueHandle)
+{}
+
+AsyncOpQueueDestroy::~AsyncOpQueueDestroy() {}
+
+void
+AsyncOpQueueDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpQueueDestroy::getOpStr() const {
+ return "QUEUE_DESTROY";
+}
+
+
+// --- class AsyncOpEventCreate ---
+
+AsyncOpEventCreate::AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle,
+ const qpid::broker::DataSource* const data,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_evtHandle(evtHandle),
+ m_data(data),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpEventCreate::~AsyncOpEventCreate() {}
+
+void
+AsyncOpEventCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpEventCreate::getOpStr() const {
+ return "EVENT_CREATE";
+}
+
+
+// --- class AsyncOpEventDestroy ---
+
+AsyncOpEventDestroy::AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_evtHandle(evtHandle),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpEventDestroy::~AsyncOpEventDestroy() {}
+
+void
+AsyncOpEventDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpEventDestroy::getOpStr() const {
+ return "EVENT_DESTROY";
+}
+
+
+// --- class AsyncOpMsgEnqueue ---
+
+AsyncOpMsgEnqueue::AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_enqHandle(enqHandle),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpMsgEnqueue::~AsyncOpMsgEnqueue() {}
+
+void AsyncOpMsgEnqueue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char* AsyncOpMsgEnqueue::getOpStr() const {
+ return "MSG_ENQUEUE";
+}
+
+
+// --- class AsyncOpMsgDequeue ---
+
+AsyncOpMsgDequeue::AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_enqHandle(enqHandle),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpMsgDequeue::~AsyncOpMsgDequeue() {}
+
+void AsyncOpMsgDequeue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char* AsyncOpMsgDequeue::getOpStr() const {
+ return "MSG_DEQUEUE";
}
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h
index cb73ad639b..2b195d7443 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.h
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.h
@@ -26,57 +26,185 @@
#include "qpid/broker/AsyncStore.h"
+#include <boost/shared_ptr.hpp>
+
namespace qpid {
namespace asyncStore {
-class AsyncStoreHandle;
+class AsyncStoreImpl;
class AsyncOperation {
public:
- typedef enum {NONE=0,
- TXN_PREPARE,
- TXN_COMMIT,
- TXN_ABORT,
- CONFIG_CREATE,
- CONFIG_DESTROY,
- QUEUE_CREATE,
- QUEUE_FLUSH,
- QUEUE_DESTROY,
- EVENT_CREATE,
- EVENT_DESTROY,
- MSG_ENQUEUE,
- MSG_DEQUEUE
- } opCode;
-
- AsyncOperation();
- AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
- AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
- AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
- AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
virtual ~AsyncOperation();
- const char* getOpStr() const;
- static const char* getOpStr(const opCode op);
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store) = 0;
boost::shared_ptr<qpid::broker::BrokerAsyncContext> getBrokerContext() const;
-
+ virtual const char* getOpStr() const = 0;
+protected:
+ void submitResult();
+ void submitResult(const int errNo,
+ const std::string& errMsg);
private:
- opCode m_op;
- const AsyncStoreHandle* m_targetHandle;
- const qpid::broker::DataSource* const m_dataSrc;
- const qpid::broker::TxnHandle* m_txnHandle;
boost::shared_ptr<qpid::broker::BrokerAsyncContext> const m_brokerCtxt;
};
+
+class AsyncOpTxnPrepare: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpTxnPrepare();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpTxnCommit: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpTxnCommit();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpTxnAbort: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpTxnAbort();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpConfigCreate: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpConfigCreate();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::ConfigHandle& m_cfgHandle;
+ const qpid::broker::DataSource* const m_data;
+};
+
+
+class AsyncOpConfigDestroy: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpConfigDestroy();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::ConfigHandle& m_cfgHandle;
+};
+
+
+class AsyncOpQueueCreate: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpQueueCreate();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::QueueHandle& m_queueHandle;
+ const qpid::broker::DataSource* const m_data;
+};
+
+
+class AsyncOpQueueFlush: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpQueueFlush();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::QueueHandle& m_queueHandle;
+};
+
+
+class AsyncOpQueueDestroy: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpQueueDestroy();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::QueueHandle& m_queueHandle;
+};
+
+
+class AsyncOpEventCreate: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle,
+ const qpid::broker::DataSource* const data,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpEventCreate();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::EventHandle& m_evtHandle;
+ const qpid::broker::DataSource* const m_data;
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpEventDestroy: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpEventDestroy();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::EventHandle& m_evtHandle;
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpMsgEnqueue: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpMsgEnqueue();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::EnqueueHandle& m_enqHandle;
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpMsgDequeue: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpMsgDequeue();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::EnqueueHandle& m_enqHandle;
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
}} // namespace qpid::asyncStore
#endif // qpid_asyncStore_AsyncOperation_h_
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index 6b5c3ac582..e8379b95e2 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -23,6 +23,7 @@
#include "AsyncStoreImpl.h"
+#include "AsyncOperation.h"
#include "ConfigHandleImpl.h"
#include "EnqueueHandleImpl.h"
#include "EventHandleImpl.h"
@@ -37,6 +38,8 @@
#include "qpid/broker/QueueHandle.h"
#include "qpid/broker/TxnHandle.h"
+//#include <boost/make_shared.hpp>
+
namespace qpid {
namespace asyncStore {
@@ -94,9 +97,8 @@ void
AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE,
- dynamic_cast<AsyncStoreHandle*>(&txnHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -104,9 +106,8 @@ void
AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT,
- dynamic_cast<AsyncStoreHandle*>(&txnHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -114,9 +115,8 @@ void
AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT,
- dynamic_cast<AsyncStoreHandle*>(&txnHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -161,10 +161,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE,
- dynamic_cast<AsyncStoreHandle*>(&cfgHandle),
- dataSrc,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -172,9 +170,8 @@ void
AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY,
- dynamic_cast<AsyncStoreHandle*>(&cfgHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -183,10 +180,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE,
- dynamic_cast<AsyncStoreHandle*>(&queueHandle),
- dataSrc,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -194,9 +189,8 @@ void
AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY,
- dynamic_cast<AsyncStoreHandle*>(&queueHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -204,9 +198,8 @@ void
AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH,
- dynamic_cast<AsyncStoreHandle*>(&queueHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -216,11 +209,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE,
- dynamic_cast<AsyncStoreHandle*>(&eventHandle),
- dataSrc,
- &txnHandle,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -229,10 +219,8 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY,
- dynamic_cast<AsyncStoreHandle*>(&eventHandle),
- &txnHandle,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -241,10 +229,8 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE,
- dynamic_cast<AsyncStoreHandle*>(&enqHandle),
- &txnHandle,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -253,10 +239,8 @@ AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE,
- dynamic_cast<AsyncStoreHandle*>(&enqHandle),
- &txnHandle,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index dd4e7c1343..b7b8970c39 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -54,7 +54,7 @@ OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
try {
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext();
- if (bc) {
+ if (bc.get()) {
qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue();
if (arq) {
qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc);
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h
index 143ac2ab0c..473b5f1cfb 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.h
+++ b/cpp/src/qpid/asyncStore/OperationQueue.h
@@ -26,7 +26,6 @@
#include "AsyncOperation.h"
-//#include "qpid/broker/AsyncStore.h"
#include "qpid/sys/PollableQueue.h"
namespace qpid {
@@ -43,6 +42,7 @@ private:
typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncOperation> > OpQueue;
OpQueue m_opQueue;
+ // Callback function for pollable queue, defined in qpid::sys::PollableQueue
OpQueue::Batch::const_iterator handle(const OpQueue::Batch& e);
};