summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/asyncStore/AsyncOperation.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/asyncStore/AsyncOperation.cpp')
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.cpp363
1 files changed, 291 insertions, 72 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
index b8fdb8b140..a22f803fcd 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
@@ -23,101 +23,320 @@
#include "AsyncOperation.h"
-#include "qpid/Exception.h"
+//#include "qpid/Exception.h"
+#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/AsyncResultHandleImpl.h"
-#include <sstream>
+//#include <sstream>
namespace qpid {
namespace asyncStore {
-AsyncOperation::AsyncOperation() :
- m_op(NONE),
- m_targetHandle(),
- m_dataSrc(0),
- m_txnHandle(0)
+AsyncOperation::AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ m_brokerCtxt(brokerCtxt)
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(0),
- m_txnHandle(0),
- m_brokerCtxt(brokerCtxt)
+AsyncOperation::~AsyncOperation()
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(dataSrc),
- m_txnHandle(0),
- m_brokerCtxt(brokerCtxt)
+boost::shared_ptr<qpid::broker::BrokerAsyncContext> AsyncOperation::getBrokerContext() const
+{
+ return m_brokerCtxt;
+}
+
+void
+AsyncOperation::submitResult()
+{
+ return submitResult(0, "");
+}
+
+void
+AsyncOperation::submitResult(const int errNo,
+ const std::string& errMsg)
+{
+ if (m_brokerCtxt.get()) {
+ qpid::broker::AsyncResultQueue* const arq = m_brokerCtxt->getAsyncResultQueue();
+ if (arq) {
+ qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(errNo, errMsg, m_brokerCtxt);
+ boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi));
+ arq->submit(arh);
+ }
+ }
+}
+
+
+// --- class AsyncOpTxnPrepare ---
+
+AsyncOpTxnPrepare::AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_txnHandle(txnHandle)
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(0),
- m_txnHandle(txnHandle),
- m_brokerCtxt(brokerCtxt)
+AsyncOpTxnPrepare::~AsyncOpTxnPrepare() {}
+
+void
+AsyncOpTxnPrepare::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpTxnPrepare::getOpStr() const {
+ return "TXN_PREPARE";
+}
+
+
+
+// --- class AsyncOpTxnCommit ---
+
+AsyncOpTxnCommit::AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_txnHandle(txnHandle)
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(dataSrc),
- m_txnHandle(txnHandle),
- m_brokerCtxt(brokerCtxt)
+AsyncOpTxnCommit::~AsyncOpTxnCommit() {}
+
+void
+AsyncOpTxnCommit::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpTxnCommit::getOpStr() const {
+ return "TXN_COMMIT";
+}
+
+
+// --- class AsyncOpTxnAbort ---
+
+AsyncOpTxnAbort::AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_txnHandle(txnHandle)
{}
-AsyncOperation::~AsyncOperation()
+AsyncOpTxnAbort::~AsyncOpTxnAbort() {}
+
+void
+AsyncOpTxnAbort::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpTxnAbort::getOpStr() const {
+ return "TXN_ABORT";
+}
+
+
+// --- class AsyncOpConfigCreate ---
+
+AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_cfgHandle(cfgHandle),
+ m_data(data)
{}
+AsyncOpConfigCreate::~AsyncOpConfigCreate() {}
+
+void
+AsyncOpConfigCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
const char*
-AsyncOperation::getOpStr() const
-{
- return getOpStr(m_op);
+AsyncOpConfigCreate::getOpStr() const {
+ return "CONFIG_CREATE";
}
-boost::shared_ptr<qpid::broker::BrokerAsyncContext>
-AsyncOperation::getBrokerContext() const
-{
- return m_brokerCtxt;
+
+// --- class AsyncOpConfigDestroy ---
+
+AsyncOpConfigDestroy::AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_cfgHandle(cfgHandle)
+{}
+
+AsyncOpConfigDestroy::~AsyncOpConfigDestroy() {}
+
+void
+AsyncOpConfigDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
}
-//static
const char*
-AsyncOperation::getOpStr(const opCode op)
-{
- switch (op) {
- case NONE: return "<none>";
- case TXN_PREPARE: return "TXN_PREPARE";
- case TXN_COMMIT: return "TXN_COMMIT";
- case TXN_ABORT: return "TXN_ABORT";
- case CONFIG_CREATE: return "CONFIG_CREATE";
- case CONFIG_DESTROY: return "CONFIG_DESTROY";
- case QUEUE_CREATE: return "QUEUE_CREATE";
- case QUEUE_FLUSH: return "QUEUE_FLUSH";
- case QUEUE_DESTROY: return "QUEUE_DESTROY";
- case EVENT_CREATE: return "EVENT_CREATE";
- case EVENT_DESTROY: return "EVENT_DESTROY";
- case MSG_ENQUEUE: return "MSG_ENQUEUE";
- case MSG_DEQUEUE: return "MSG_DEQUEUE";
- }
- std::ostringstream oss;
- oss << "AsyncStore: AsyncOperation::getOpStr(): Unknown op-code \"" << op << "\"";
- throw qpid::Exception(oss.str());
+AsyncOpConfigDestroy::getOpStr() const {
+ return "CONFIG_DESTROY";
+}
+
+
+// --- class AsyncOpQueueCreate ---
+
+AsyncOpQueueCreate::AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_queueHandle(queueHandle),
+ m_data(data)
+{}
+
+AsyncOpQueueCreate::~AsyncOpQueueCreate() {}
+
+void
+AsyncOpQueueCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpQueueCreate::getOpStr() const {
+ return "QUEUE_CREATE";
+}
+
+
+// --- class AsyncOpQueueFlush ---
+
+AsyncOpQueueFlush::AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_queueHandle(queueHandle)
+{}
+
+AsyncOpQueueFlush::~AsyncOpQueueFlush() {}
+
+void
+AsyncOpQueueFlush::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpQueueFlush::getOpStr() const {
+ return "QUEUE_FLUSH";
+}
+
+
+// --- class AsyncOpQueueDestroy ---
+
+AsyncOpQueueDestroy::AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_queueHandle(queueHandle)
+{}
+
+AsyncOpQueueDestroy::~AsyncOpQueueDestroy() {}
+
+void
+AsyncOpQueueDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpQueueDestroy::getOpStr() const {
+ return "QUEUE_DESTROY";
+}
+
+
+// --- class AsyncOpEventCreate ---
+
+AsyncOpEventCreate::AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle,
+ const qpid::broker::DataSource* const data,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_evtHandle(evtHandle),
+ m_data(data),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpEventCreate::~AsyncOpEventCreate() {}
+
+void
+AsyncOpEventCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpEventCreate::getOpStr() const {
+ return "EVENT_CREATE";
+}
+
+
+// --- class AsyncOpEventDestroy ---
+
+AsyncOpEventDestroy::AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_evtHandle(evtHandle),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpEventDestroy::~AsyncOpEventDestroy() {}
+
+void
+AsyncOpEventDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpEventDestroy::getOpStr() const {
+ return "EVENT_DESTROY";
+}
+
+
+// --- class AsyncOpMsgEnqueue ---
+
+AsyncOpMsgEnqueue::AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_enqHandle(enqHandle),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpMsgEnqueue::~AsyncOpMsgEnqueue() {}
+
+void AsyncOpMsgEnqueue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char* AsyncOpMsgEnqueue::getOpStr() const {
+ return "MSG_ENQUEUE";
+}
+
+
+// --- class AsyncOpMsgDequeue ---
+
+AsyncOpMsgDequeue::AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_enqHandle(enqHandle),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpMsgDequeue::~AsyncOpMsgDequeue() {}
+
+void AsyncOpMsgDequeue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char* AsyncOpMsgDequeue::getOpStr() const {
+ return "MSG_DEQUEUE";
}
}} // namespace qpid::asyncStore