summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.cpp363
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.h208
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp70
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp2
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.h2
-rw-r--r--cpp/src/qpid/broker/AsyncResultQueueImpl.h1
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h5
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.cpp26
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.h14
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.cpp57
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.h3
11 files changed, 534 insertions, 217 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
index b8fdb8b140..a22f803fcd 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
@@ -23,101 +23,320 @@
#include "AsyncOperation.h"
-#include "qpid/Exception.h"
+//#include "qpid/Exception.h"
+#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/AsyncResultHandleImpl.h"
-#include <sstream>
+//#include <sstream>
namespace qpid {
namespace asyncStore {
-AsyncOperation::AsyncOperation() :
- m_op(NONE),
- m_targetHandle(),
- m_dataSrc(0),
- m_txnHandle(0)
+AsyncOperation::AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ m_brokerCtxt(brokerCtxt)
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(0),
- m_txnHandle(0),
- m_brokerCtxt(brokerCtxt)
+AsyncOperation::~AsyncOperation()
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(dataSrc),
- m_txnHandle(0),
- m_brokerCtxt(brokerCtxt)
+boost::shared_ptr<qpid::broker::BrokerAsyncContext> AsyncOperation::getBrokerContext() const
+{
+ return m_brokerCtxt;
+}
+
+void
+AsyncOperation::submitResult()
+{
+ return submitResult(0, "");
+}
+
+void
+AsyncOperation::submitResult(const int errNo,
+ const std::string& errMsg)
+{
+ if (m_brokerCtxt.get()) {
+ qpid::broker::AsyncResultQueue* const arq = m_brokerCtxt->getAsyncResultQueue();
+ if (arq) {
+ qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(errNo, errMsg, m_brokerCtxt);
+ boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi));
+ arq->submit(arh);
+ }
+ }
+}
+
+
+// --- class AsyncOpTxnPrepare ---
+
+AsyncOpTxnPrepare::AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_txnHandle(txnHandle)
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(0),
- m_txnHandle(txnHandle),
- m_brokerCtxt(brokerCtxt)
+AsyncOpTxnPrepare::~AsyncOpTxnPrepare() {}
+
+void
+AsyncOpTxnPrepare::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpTxnPrepare::getOpStr() const {
+ return "TXN_PREPARE";
+}
+
+
+
+// --- class AsyncOpTxnCommit ---
+
+AsyncOpTxnCommit::AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_txnHandle(txnHandle)
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(dataSrc),
- m_txnHandle(txnHandle),
- m_brokerCtxt(brokerCtxt)
+AsyncOpTxnCommit::~AsyncOpTxnCommit() {}
+
+void
+AsyncOpTxnCommit::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpTxnCommit::getOpStr() const {
+ return "TXN_COMMIT";
+}
+
+
+// --- class AsyncOpTxnAbort ---
+
+AsyncOpTxnAbort::AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_txnHandle(txnHandle)
{}
-AsyncOperation::~AsyncOperation()
+AsyncOpTxnAbort::~AsyncOpTxnAbort() {}
+
+void
+AsyncOpTxnAbort::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpTxnAbort::getOpStr() const {
+ return "TXN_ABORT";
+}
+
+
+// --- class AsyncOpConfigCreate ---
+
+AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_cfgHandle(cfgHandle),
+ m_data(data)
{}
+AsyncOpConfigCreate::~AsyncOpConfigCreate() {}
+
+void
+AsyncOpConfigCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
const char*
-AsyncOperation::getOpStr() const
-{
- return getOpStr(m_op);
+AsyncOpConfigCreate::getOpStr() const {
+ return "CONFIG_CREATE";
}
-boost::shared_ptr<qpid::broker::BrokerAsyncContext>
-AsyncOperation::getBrokerContext() const
-{
- return m_brokerCtxt;
+
+// --- class AsyncOpConfigDestroy ---
+
+AsyncOpConfigDestroy::AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_cfgHandle(cfgHandle)
+{}
+
+AsyncOpConfigDestroy::~AsyncOpConfigDestroy() {}
+
+void
+AsyncOpConfigDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
}
-//static
const char*
-AsyncOperation::getOpStr(const opCode op)
-{
- switch (op) {
- case NONE: return "<none>";
- case TXN_PREPARE: return "TXN_PREPARE";
- case TXN_COMMIT: return "TXN_COMMIT";
- case TXN_ABORT: return "TXN_ABORT";
- case CONFIG_CREATE: return "CONFIG_CREATE";
- case CONFIG_DESTROY: return "CONFIG_DESTROY";
- case QUEUE_CREATE: return "QUEUE_CREATE";
- case QUEUE_FLUSH: return "QUEUE_FLUSH";
- case QUEUE_DESTROY: return "QUEUE_DESTROY";
- case EVENT_CREATE: return "EVENT_CREATE";
- case EVENT_DESTROY: return "EVENT_DESTROY";
- case MSG_ENQUEUE: return "MSG_ENQUEUE";
- case MSG_DEQUEUE: return "MSG_DEQUEUE";
- }
- std::ostringstream oss;
- oss << "AsyncStore: AsyncOperation::getOpStr(): Unknown op-code \"" << op << "\"";
- throw qpid::Exception(oss.str());
+AsyncOpConfigDestroy::getOpStr() const {
+ return "CONFIG_DESTROY";
+}
+
+
+// --- class AsyncOpQueueCreate ---
+
+AsyncOpQueueCreate::AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_queueHandle(queueHandle),
+ m_data(data)
+{}
+
+AsyncOpQueueCreate::~AsyncOpQueueCreate() {}
+
+void
+AsyncOpQueueCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpQueueCreate::getOpStr() const {
+ return "QUEUE_CREATE";
+}
+
+
+// --- class AsyncOpQueueFlush ---
+
+AsyncOpQueueFlush::AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_queueHandle(queueHandle)
+{}
+
+AsyncOpQueueFlush::~AsyncOpQueueFlush() {}
+
+void
+AsyncOpQueueFlush::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpQueueFlush::getOpStr() const {
+ return "QUEUE_FLUSH";
+}
+
+
+// --- class AsyncOpQueueDestroy ---
+
+AsyncOpQueueDestroy::AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_queueHandle(queueHandle)
+{}
+
+AsyncOpQueueDestroy::~AsyncOpQueueDestroy() {}
+
+void
+AsyncOpQueueDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpQueueDestroy::getOpStr() const {
+ return "QUEUE_DESTROY";
+}
+
+
+// --- class AsyncOpEventCreate ---
+
+AsyncOpEventCreate::AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle,
+ const qpid::broker::DataSource* const data,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_evtHandle(evtHandle),
+ m_data(data),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpEventCreate::~AsyncOpEventCreate() {}
+
+void
+AsyncOpEventCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpEventCreate::getOpStr() const {
+ return "EVENT_CREATE";
+}
+
+
+// --- class AsyncOpEventDestroy ---
+
+AsyncOpEventDestroy::AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_evtHandle(evtHandle),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpEventDestroy::~AsyncOpEventDestroy() {}
+
+void
+AsyncOpEventDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpEventDestroy::getOpStr() const {
+ return "EVENT_DESTROY";
+}
+
+
+// --- class AsyncOpMsgEnqueue ---
+
+AsyncOpMsgEnqueue::AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_enqHandle(enqHandle),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpMsgEnqueue::~AsyncOpMsgEnqueue() {}
+
+void AsyncOpMsgEnqueue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char* AsyncOpMsgEnqueue::getOpStr() const {
+ return "MSG_ENQUEUE";
+}
+
+
+// --- class AsyncOpMsgDequeue ---
+
+AsyncOpMsgDequeue::AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_enqHandle(enqHandle),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpMsgDequeue::~AsyncOpMsgDequeue() {}
+
+void AsyncOpMsgDequeue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char* AsyncOpMsgDequeue::getOpStr() const {
+ return "MSG_DEQUEUE";
}
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h
index cb73ad639b..2b195d7443 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.h
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.h
@@ -26,57 +26,185 @@
#include "qpid/broker/AsyncStore.h"
+#include <boost/shared_ptr.hpp>
+
namespace qpid {
namespace asyncStore {
-class AsyncStoreHandle;
+class AsyncStoreImpl;
class AsyncOperation {
public:
- typedef enum {NONE=0,
- TXN_PREPARE,
- TXN_COMMIT,
- TXN_ABORT,
- CONFIG_CREATE,
- CONFIG_DESTROY,
- QUEUE_CREATE,
- QUEUE_FLUSH,
- QUEUE_DESTROY,
- EVENT_CREATE,
- EVENT_DESTROY,
- MSG_ENQUEUE,
- MSG_DEQUEUE
- } opCode;
-
- AsyncOperation();
- AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
- AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
- AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
- AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
virtual ~AsyncOperation();
- const char* getOpStr() const;
- static const char* getOpStr(const opCode op);
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store) = 0;
boost::shared_ptr<qpid::broker::BrokerAsyncContext> getBrokerContext() const;
-
+ virtual const char* getOpStr() const = 0;
+protected:
+ void submitResult();
+ void submitResult(const int errNo,
+ const std::string& errMsg);
private:
- opCode m_op;
- const AsyncStoreHandle* m_targetHandle;
- const qpid::broker::DataSource* const m_dataSrc;
- const qpid::broker::TxnHandle* m_txnHandle;
boost::shared_ptr<qpid::broker::BrokerAsyncContext> const m_brokerCtxt;
};
+
+class AsyncOpTxnPrepare: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpTxnPrepare();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpTxnCommit: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpTxnCommit();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpTxnAbort: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpTxnAbort();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpConfigCreate: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpConfigCreate();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::ConfigHandle& m_cfgHandle;
+ const qpid::broker::DataSource* const m_data;
+};
+
+
+class AsyncOpConfigDestroy: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpConfigDestroy();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::ConfigHandle& m_cfgHandle;
+};
+
+
+class AsyncOpQueueCreate: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpQueueCreate();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::QueueHandle& m_queueHandle;
+ const qpid::broker::DataSource* const m_data;
+};
+
+
+class AsyncOpQueueFlush: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpQueueFlush();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::QueueHandle& m_queueHandle;
+};
+
+
+class AsyncOpQueueDestroy: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpQueueDestroy();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::QueueHandle& m_queueHandle;
+};
+
+
+class AsyncOpEventCreate: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle,
+ const qpid::broker::DataSource* const data,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpEventCreate();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::EventHandle& m_evtHandle;
+ const qpid::broker::DataSource* const m_data;
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpEventDestroy: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpEventDestroy();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::EventHandle& m_evtHandle;
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpMsgEnqueue: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpMsgEnqueue();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::EnqueueHandle& m_enqHandle;
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpMsgDequeue: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpMsgDequeue();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::EnqueueHandle& m_enqHandle;
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
}} // namespace qpid::asyncStore
#endif // qpid_asyncStore_AsyncOperation_h_
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index 6b5c3ac582..e8379b95e2 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -23,6 +23,7 @@
#include "AsyncStoreImpl.h"
+#include "AsyncOperation.h"
#include "ConfigHandleImpl.h"
#include "EnqueueHandleImpl.h"
#include "EventHandleImpl.h"
@@ -37,6 +38,8 @@
#include "qpid/broker/QueueHandle.h"
#include "qpid/broker/TxnHandle.h"
+//#include <boost/make_shared.hpp>
+
namespace qpid {
namespace asyncStore {
@@ -94,9 +97,8 @@ void
AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE,
- dynamic_cast<AsyncStoreHandle*>(&txnHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -104,9 +106,8 @@ void
AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT,
- dynamic_cast<AsyncStoreHandle*>(&txnHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -114,9 +115,8 @@ void
AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT,
- dynamic_cast<AsyncStoreHandle*>(&txnHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -161,10 +161,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE,
- dynamic_cast<AsyncStoreHandle*>(&cfgHandle),
- dataSrc,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -172,9 +170,8 @@ void
AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY,
- dynamic_cast<AsyncStoreHandle*>(&cfgHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -183,10 +180,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE,
- dynamic_cast<AsyncStoreHandle*>(&queueHandle),
- dataSrc,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -194,9 +189,8 @@ void
AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY,
- dynamic_cast<AsyncStoreHandle*>(&queueHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -204,9 +198,8 @@ void
AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH,
- dynamic_cast<AsyncStoreHandle*>(&queueHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -216,11 +209,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE,
- dynamic_cast<AsyncStoreHandle*>(&eventHandle),
- dataSrc,
- &txnHandle,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -229,10 +219,8 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY,
- dynamic_cast<AsyncStoreHandle*>(&eventHandle),
- &txnHandle,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -241,10 +229,8 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE,
- dynamic_cast<AsyncStoreHandle*>(&enqHandle),
- &txnHandle,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -253,10 +239,8 @@ AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE,
- dynamic_cast<AsyncStoreHandle*>(&enqHandle),
- &txnHandle,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index dd4e7c1343..b7b8970c39 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -54,7 +54,7 @@ OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
try {
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext();
- if (bc) {
+ if (bc.get()) {
qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue();
if (arq) {
qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc);
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h
index 143ac2ab0c..473b5f1cfb 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.h
+++ b/cpp/src/qpid/asyncStore/OperationQueue.h
@@ -26,7 +26,6 @@
#include "AsyncOperation.h"
-//#include "qpid/broker/AsyncStore.h"
#include "qpid/sys/PollableQueue.h"
namespace qpid {
@@ -43,6 +42,7 @@ private:
typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncOperation> > OpQueue;
OpQueue m_opQueue;
+ // Callback function for pollable queue, defined in qpid::sys::PollableQueue
OpQueue::Batch::const_iterator handle(const OpQueue::Batch& e);
};
diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.h b/cpp/src/qpid/broker/AsyncResultQueueImpl.h
index 01fff6a53a..805057400f 100644
--- a/cpp/src/qpid/broker/AsyncResultQueueImpl.h
+++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.h
@@ -44,6 +44,7 @@ private:
typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncResultHandle> > ResultQueue;
ResultQueue m_resQueue;
+ // Callback function for pollable queue, defined in qpid::sys::PollableQueue
ResultQueue::Batch::const_iterator handle(const ResultQueue::Batch& e);
};
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index 7bb6175862..1c8f4b0737 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -46,6 +46,10 @@ public:
virtual ~BrokerAsyncContext() {}
virtual AsyncResultQueue* getAsyncResultQueue() const = 0;
virtual void invokeCallback(const AsyncResultHandle* const) const = 0;
+ void setOpStr(const char* opStr) { m_opStr = opStr; }
+ const char* getOpStr() const { return m_opStr; }
+private:
+ const char* m_opStr;
};
class DataSource {
@@ -83,6 +87,7 @@ public:
boost::shared_ptr<BrokerAsyncContext>) = 0;
virtual void submitAbort(TxnHandle&,
boost::shared_ptr<BrokerAsyncContext>) = 0;
+ void testOp() const {}
};
// Subclassed by store:
diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp
index c3d4342993..63e2de2b41 100644
--- a/cpp/src/qpid/broker/TxnAsyncContext.cpp
+++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp
@@ -27,13 +27,9 @@ namespace qpid {
namespace broker {
TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb,
- TxnHandle& th,
- const qpid::asyncStore::AsyncOperation::opCode op,
- qpid::broker::AsyncResultCallback rcb,
- qpid::broker::AsyncResultQueue* const arq):
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq):
m_tb(tb),
- m_th(th),
- m_op(op),
m_rcb(rcb),
m_arq(arq)
{}
@@ -47,24 +43,6 @@ TxnAsyncContext::getTxnBuffer() const
return m_tb;
}
-qpid::asyncStore::AsyncOperation::opCode
-TxnAsyncContext::getOpCode() const
-{
- return m_op;
-}
-
-const char*
-TxnAsyncContext::getOpStr() const
-{
- return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
-}
-
-TxnHandle&
-TxnAsyncContext::getTransactionContext() const
-{
- return m_th;
-}
-
AsyncResultQueue*
TxnAsyncContext::getAsyncResultQueue() const
{
diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h
index 810c46429c..0c35b110a8 100644
--- a/cpp/src/qpid/broker/TxnAsyncContext.h
+++ b/cpp/src/qpid/broker/TxnAsyncContext.h
@@ -29,6 +29,9 @@
#include "qpid/asyncStore/AsyncOperation.h"
namespace qpid {
+namespace asyncStore {
+class AsyncOperation;
+}
namespace broker {
class TxnHandle;
@@ -39,15 +42,10 @@ class TxnAsyncContext: public BrokerAsyncContext
{
public:
TxnAsyncContext(TxnBuffer* const tb,
- TxnHandle& th,
- const qpid::asyncStore::AsyncOperation::opCode op,
- qpid::broker::AsyncResultCallback rcb,
- qpid::broker::AsyncResultQueue* const arq);
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq);
virtual ~TxnAsyncContext();
TxnBuffer* getTxnBuffer() const;
- qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
- const char* getOpStr() const;
- TxnHandle& getTransactionContext() const;
// --- Interface BrokerAsyncContext ---
AsyncResultQueue* getAsyncResultQueue() const;
@@ -55,8 +53,6 @@ public:
private:
TxnBuffer* const m_tb;
- TxnHandle& m_th;
- const qpid::asyncStore::AsyncOperation::opCode m_op;
AsyncResultCallback m_rcb;
AsyncResultQueue* const m_arq;
};
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp
index 425d725e9e..ba00293452 100644
--- a/cpp/src/qpid/broker/TxnBuffer.cpp
+++ b/cpp/src/qpid/broker/TxnBuffer.cpp
@@ -96,26 +96,6 @@ TxnBuffer::commitLocal(AsyncTransaction* const store)
return false;
}
-// static
-void
-TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh)
-{
- if (arh) {
- boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
- if (arh->getErrNo()) {
- QPID_LOG(error, "TxnBuffer::handleAsyncResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo()
- << " (" << arh->getErrMsg() << ")");
- tac->getTxnBuffer()->asyncLocalAbort();
- } else {
- if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) {
- tac->getTxnBuffer()->asyncLocalAbort();
- } else {
- tac->getTxnBuffer()->asyncLocalCommit();
- }
- }
- }
-}
-
void
TxnBuffer::asyncLocalCommit()
{
@@ -130,10 +110,9 @@ TxnBuffer::asyncLocalCommit()
m_state = COMMIT;
{
boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
- m_txnHandle,
- qpid::asyncStore::AsyncOperation::TXN_COMMIT,
- &handleAsyncResult,
+ &handleAsyncCommitResult,
&m_resultQueue));
+ m_store->testOp();
m_store->submitCommit(m_txnHandle, tac);
}
break;
@@ -147,6 +126,21 @@ TxnBuffer::asyncLocalCommit()
}
}
+//static
+void
+TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ QPID_LOG(error, "TxnBuffer::handleAsyncCommitResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo()
+ << " (" << arh->getErrMsg() << ")");
+ tac->getTxnBuffer()->asyncLocalAbort();
+ } else {
+ tac->getTxnBuffer()->asyncLocalCommit();
+ }
+ }
+}
+
void
TxnBuffer::asyncLocalAbort()
{
@@ -158,9 +152,7 @@ TxnBuffer::asyncLocalAbort()
m_state = ROLLBACK;
{
boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
- m_txnHandle,
- qpid::asyncStore::AsyncOperation::TXN_ABORT,
- &handleAsyncResult,
+ &handleAsyncAbortResult,
&m_resultQueue));
m_store->submitCommit(m_txnHandle, tac);
}
@@ -173,4 +165,17 @@ TxnBuffer::asyncLocalAbort()
}
}
+//static
+void
+TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo()
+ << " (" << arh->getErrMsg() << ")");
+ }
+ tac->getTxnBuffer()->asyncLocalAbort();
+ }
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h
index cd78846b71..bd9743cad7 100644
--- a/cpp/src/qpid/broker/TxnBuffer.h
+++ b/cpp/src/qpid/broker/TxnBuffer.h
@@ -51,9 +51,10 @@ public:
bool commitLocal(AsyncTransaction* const store);
// --- Async operations ---
- static void handleAsyncResult(const AsyncResultHandle* const arh);
void asyncLocalCommit();
+ static void handleAsyncCommitResult(const AsyncResultHandle* const arh);
void asyncLocalAbort();
+ static void handleAsyncAbortResult(const AsyncResultHandle* const arh);
private:
std::vector<boost::shared_ptr<TxnOp> > m_ops;