summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-07-20 12:55:20 +0000
committerKim van der Riet <kpvdr@apache.org>2012-07-20 12:55:20 +0000
commit2e437e1569009d8e8ed3ed896d751994e2e85d74 (patch)
tree28459e48638bfcd3a7e565c37165b3fab714d484
parentc94c9b5333c06c03deb6a6dcb1a91ecdf111b481 (diff)
downloadqpid-python-2e437e1569009d8e8ed3ed896d751994e2e85d74.tar.gz
QPID-3858: WIP: Created many async operation classes for each op instead of a single class with op codes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1363759 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.cpp363
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.h208
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp70
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp2
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.h2
-rw-r--r--cpp/src/qpid/broker/AsyncResultQueueImpl.h1
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h5
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.cpp26
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.h14
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.cpp57
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.h3
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp14
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp16
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h7
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp133
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h16
17 files changed, 613 insertions, 328 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
index b8fdb8b140..a22f803fcd 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
@@ -23,101 +23,320 @@
#include "AsyncOperation.h"
-#include "qpid/Exception.h"
+//#include "qpid/Exception.h"
+#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/AsyncResultHandleImpl.h"
-#include <sstream>
+//#include <sstream>
namespace qpid {
namespace asyncStore {
-AsyncOperation::AsyncOperation() :
- m_op(NONE),
- m_targetHandle(),
- m_dataSrc(0),
- m_txnHandle(0)
+AsyncOperation::AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ m_brokerCtxt(brokerCtxt)
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(0),
- m_txnHandle(0),
- m_brokerCtxt(brokerCtxt)
+AsyncOperation::~AsyncOperation()
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(dataSrc),
- m_txnHandle(0),
- m_brokerCtxt(brokerCtxt)
+boost::shared_ptr<qpid::broker::BrokerAsyncContext> AsyncOperation::getBrokerContext() const
+{
+ return m_brokerCtxt;
+}
+
+void
+AsyncOperation::submitResult()
+{
+ return submitResult(0, "");
+}
+
+void
+AsyncOperation::submitResult(const int errNo,
+ const std::string& errMsg)
+{
+ if (m_brokerCtxt.get()) {
+ qpid::broker::AsyncResultQueue* const arq = m_brokerCtxt->getAsyncResultQueue();
+ if (arq) {
+ qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(errNo, errMsg, m_brokerCtxt);
+ boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi));
+ arq->submit(arh);
+ }
+ }
+}
+
+
+// --- class AsyncOpTxnPrepare ---
+
+AsyncOpTxnPrepare::AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_txnHandle(txnHandle)
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(0),
- m_txnHandle(txnHandle),
- m_brokerCtxt(brokerCtxt)
+AsyncOpTxnPrepare::~AsyncOpTxnPrepare() {}
+
+void
+AsyncOpTxnPrepare::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpTxnPrepare::getOpStr() const {
+ return "TXN_PREPARE";
+}
+
+
+
+// --- class AsyncOpTxnCommit ---
+
+AsyncOpTxnCommit::AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_txnHandle(txnHandle)
{}
-AsyncOperation::AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_op(op),
- m_targetHandle(th),
- m_dataSrc(dataSrc),
- m_txnHandle(txnHandle),
- m_brokerCtxt(brokerCtxt)
+AsyncOpTxnCommit::~AsyncOpTxnCommit() {}
+
+void
+AsyncOpTxnCommit::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpTxnCommit::getOpStr() const {
+ return "TXN_COMMIT";
+}
+
+
+// --- class AsyncOpTxnAbort ---
+
+AsyncOpTxnAbort::AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_txnHandle(txnHandle)
{}
-AsyncOperation::~AsyncOperation()
+AsyncOpTxnAbort::~AsyncOpTxnAbort() {}
+
+void
+AsyncOpTxnAbort::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpTxnAbort::getOpStr() const {
+ return "TXN_ABORT";
+}
+
+
+// --- class AsyncOpConfigCreate ---
+
+AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_cfgHandle(cfgHandle),
+ m_data(data)
{}
+AsyncOpConfigCreate::~AsyncOpConfigCreate() {}
+
+void
+AsyncOpConfigCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
const char*
-AsyncOperation::getOpStr() const
-{
- return getOpStr(m_op);
+AsyncOpConfigCreate::getOpStr() const {
+ return "CONFIG_CREATE";
}
-boost::shared_ptr<qpid::broker::BrokerAsyncContext>
-AsyncOperation::getBrokerContext() const
-{
- return m_brokerCtxt;
+
+// --- class AsyncOpConfigDestroy ---
+
+AsyncOpConfigDestroy::AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_cfgHandle(cfgHandle)
+{}
+
+AsyncOpConfigDestroy::~AsyncOpConfigDestroy() {}
+
+void
+AsyncOpConfigDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
}
-//static
const char*
-AsyncOperation::getOpStr(const opCode op)
-{
- switch (op) {
- case NONE: return "<none>";
- case TXN_PREPARE: return "TXN_PREPARE";
- case TXN_COMMIT: return "TXN_COMMIT";
- case TXN_ABORT: return "TXN_ABORT";
- case CONFIG_CREATE: return "CONFIG_CREATE";
- case CONFIG_DESTROY: return "CONFIG_DESTROY";
- case QUEUE_CREATE: return "QUEUE_CREATE";
- case QUEUE_FLUSH: return "QUEUE_FLUSH";
- case QUEUE_DESTROY: return "QUEUE_DESTROY";
- case EVENT_CREATE: return "EVENT_CREATE";
- case EVENT_DESTROY: return "EVENT_DESTROY";
- case MSG_ENQUEUE: return "MSG_ENQUEUE";
- case MSG_DEQUEUE: return "MSG_DEQUEUE";
- }
- std::ostringstream oss;
- oss << "AsyncStore: AsyncOperation::getOpStr(): Unknown op-code \"" << op << "\"";
- throw qpid::Exception(oss.str());
+AsyncOpConfigDestroy::getOpStr() const {
+ return "CONFIG_DESTROY";
+}
+
+
+// --- class AsyncOpQueueCreate ---
+
+AsyncOpQueueCreate::AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_queueHandle(queueHandle),
+ m_data(data)
+{}
+
+AsyncOpQueueCreate::~AsyncOpQueueCreate() {}
+
+void
+AsyncOpQueueCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpQueueCreate::getOpStr() const {
+ return "QUEUE_CREATE";
+}
+
+
+// --- class AsyncOpQueueFlush ---
+
+AsyncOpQueueFlush::AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_queueHandle(queueHandle)
+{}
+
+AsyncOpQueueFlush::~AsyncOpQueueFlush() {}
+
+void
+AsyncOpQueueFlush::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpQueueFlush::getOpStr() const {
+ return "QUEUE_FLUSH";
+}
+
+
+// --- class AsyncOpQueueDestroy ---
+
+AsyncOpQueueDestroy::AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_queueHandle(queueHandle)
+{}
+
+AsyncOpQueueDestroy::~AsyncOpQueueDestroy() {}
+
+void
+AsyncOpQueueDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpQueueDestroy::getOpStr() const {
+ return "QUEUE_DESTROY";
+}
+
+
+// --- class AsyncOpEventCreate ---
+
+AsyncOpEventCreate::AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle,
+ const qpid::broker::DataSource* const data,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_evtHandle(evtHandle),
+ m_data(data),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpEventCreate::~AsyncOpEventCreate() {}
+
+void
+AsyncOpEventCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpEventCreate::getOpStr() const {
+ return "EVENT_CREATE";
+}
+
+
+// --- class AsyncOpEventDestroy ---
+
+AsyncOpEventDestroy::AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_evtHandle(evtHandle),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpEventDestroy::~AsyncOpEventDestroy() {}
+
+void
+AsyncOpEventDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char*
+AsyncOpEventDestroy::getOpStr() const {
+ return "EVENT_DESTROY";
+}
+
+
+// --- class AsyncOpMsgEnqueue ---
+
+AsyncOpMsgEnqueue::AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_enqHandle(enqHandle),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpMsgEnqueue::~AsyncOpMsgEnqueue() {}
+
+void AsyncOpMsgEnqueue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char* AsyncOpMsgEnqueue::getOpStr() const {
+ return "MSG_ENQUEUE";
+}
+
+
+// --- class AsyncOpMsgDequeue ---
+
+AsyncOpMsgDequeue::AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
+ AsyncOperation(brokerCtxt),
+ m_enqHandle(enqHandle),
+ m_txnHandle(txnHandle)
+{}
+
+AsyncOpMsgDequeue::~AsyncOpMsgDequeue() {}
+
+void AsyncOpMsgDequeue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+ // TODO: Implement store operation here
+ submitResult();
+}
+
+const char* AsyncOpMsgDequeue::getOpStr() const {
+ return "MSG_DEQUEUE";
}
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h
index cb73ad639b..2b195d7443 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.h
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.h
@@ -26,57 +26,185 @@
#include "qpid/broker/AsyncStore.h"
+#include <boost/shared_ptr.hpp>
+
namespace qpid {
namespace asyncStore {
-class AsyncStoreHandle;
+class AsyncStoreImpl;
class AsyncOperation {
public:
- typedef enum {NONE=0,
- TXN_PREPARE,
- TXN_COMMIT,
- TXN_ABORT,
- CONFIG_CREATE,
- CONFIG_DESTROY,
- QUEUE_CREATE,
- QUEUE_FLUSH,
- QUEUE_DESTROY,
- EVENT_CREATE,
- EVENT_DESTROY,
- MSG_ENQUEUE,
- MSG_DEQUEUE
- } opCode;
-
- AsyncOperation();
- AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
- AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
- AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
- AsyncOperation(const opCode op,
- const AsyncStoreHandle* th,
- const qpid::broker::DataSource* const dataSrc,
- const qpid::broker::TxnHandle* txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
virtual ~AsyncOperation();
- const char* getOpStr() const;
- static const char* getOpStr(const opCode op);
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store) = 0;
boost::shared_ptr<qpid::broker::BrokerAsyncContext> getBrokerContext() const;
-
+ virtual const char* getOpStr() const = 0;
+protected:
+ void submitResult();
+ void submitResult(const int errNo,
+ const std::string& errMsg);
private:
- opCode m_op;
- const AsyncStoreHandle* m_targetHandle;
- const qpid::broker::DataSource* const m_dataSrc;
- const qpid::broker::TxnHandle* m_txnHandle;
boost::shared_ptr<qpid::broker::BrokerAsyncContext> const m_brokerCtxt;
};
+
+class AsyncOpTxnPrepare: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpTxnPrepare();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpTxnCommit: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpTxnCommit();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpTxnAbort: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpTxnAbort();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpConfigCreate: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpConfigCreate();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::ConfigHandle& m_cfgHandle;
+ const qpid::broker::DataSource* const m_data;
+};
+
+
+class AsyncOpConfigDestroy: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpConfigDestroy();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::ConfigHandle& m_cfgHandle;
+};
+
+
+class AsyncOpQueueCreate: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle,
+ const qpid::broker::DataSource* const data,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpQueueCreate();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::QueueHandle& m_queueHandle;
+ const qpid::broker::DataSource* const m_data;
+};
+
+
+class AsyncOpQueueFlush: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpQueueFlush();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::QueueHandle& m_queueHandle;
+};
+
+
+class AsyncOpQueueDestroy: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpQueueDestroy();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::QueueHandle& m_queueHandle;
+};
+
+
+class AsyncOpEventCreate: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle,
+ const qpid::broker::DataSource* const data,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpEventCreate();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::EventHandle& m_evtHandle;
+ const qpid::broker::DataSource* const m_data;
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpEventDestroy: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpEventDestroy();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::EventHandle& m_evtHandle;
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpMsgEnqueue: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpMsgEnqueue();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::EnqueueHandle& m_enqHandle;
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
+
+class AsyncOpMsgDequeue: public qpid::asyncStore::AsyncOperation {
+public:
+ AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle,
+ qpid::broker::TxnHandle& txnHandle,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ virtual ~AsyncOpMsgDequeue();
+ virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual const char* getOpStr() const;
+private:
+ qpid::broker::EnqueueHandle& m_enqHandle;
+ qpid::broker::TxnHandle& m_txnHandle;
+};
+
}} // namespace qpid::asyncStore
#endif // qpid_asyncStore_AsyncOperation_h_
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index 6b5c3ac582..e8379b95e2 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -23,6 +23,7 @@
#include "AsyncStoreImpl.h"
+#include "AsyncOperation.h"
#include "ConfigHandleImpl.h"
#include "EnqueueHandleImpl.h"
#include "EventHandleImpl.h"
@@ -37,6 +38,8 @@
#include "qpid/broker/QueueHandle.h"
#include "qpid/broker/TxnHandle.h"
+//#include <boost/make_shared.hpp>
+
namespace qpid {
namespace asyncStore {
@@ -94,9 +97,8 @@ void
AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE,
- dynamic_cast<AsyncStoreHandle*>(&txnHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -104,9 +106,8 @@ void
AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT,
- dynamic_cast<AsyncStoreHandle*>(&txnHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -114,9 +115,8 @@ void
AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT,
- dynamic_cast<AsyncStoreHandle*>(&txnHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -161,10 +161,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE,
- dynamic_cast<AsyncStoreHandle*>(&cfgHandle),
- dataSrc,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -172,9 +170,8 @@ void
AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY,
- dynamic_cast<AsyncStoreHandle*>(&cfgHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -183,10 +180,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE,
- dynamic_cast<AsyncStoreHandle*>(&queueHandle),
- dataSrc,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -194,9 +189,8 @@ void
AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY,
- dynamic_cast<AsyncStoreHandle*>(&queueHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -204,9 +198,8 @@ void
AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH,
- dynamic_cast<AsyncStoreHandle*>(&queueHandle),
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -216,11 +209,8 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE,
- dynamic_cast<AsyncStoreHandle*>(&eventHandle),
- dataSrc,
- &txnHandle,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -229,10 +219,8 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY,
- dynamic_cast<AsyncStoreHandle*>(&eventHandle),
- &txnHandle,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -241,10 +229,8 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE,
- dynamic_cast<AsyncStoreHandle*>(&enqHandle),
- &txnHandle,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -253,10 +239,8 @@ AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE,
- dynamic_cast<AsyncStoreHandle*>(&enqHandle),
- &txnHandle,
- brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, brokerCtxt));
+ brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index dd4e7c1343..b7b8970c39 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -54,7 +54,7 @@ OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
try {
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext();
- if (bc) {
+ if (bc.get()) {
qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue();
if (arq) {
qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc);
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h
index 143ac2ab0c..473b5f1cfb 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.h
+++ b/cpp/src/qpid/asyncStore/OperationQueue.h
@@ -26,7 +26,6 @@
#include "AsyncOperation.h"
-//#include "qpid/broker/AsyncStore.h"
#include "qpid/sys/PollableQueue.h"
namespace qpid {
@@ -43,6 +42,7 @@ private:
typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncOperation> > OpQueue;
OpQueue m_opQueue;
+ // Callback function for pollable queue, defined in qpid::sys::PollableQueue
OpQueue::Batch::const_iterator handle(const OpQueue::Batch& e);
};
diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.h b/cpp/src/qpid/broker/AsyncResultQueueImpl.h
index 01fff6a53a..805057400f 100644
--- a/cpp/src/qpid/broker/AsyncResultQueueImpl.h
+++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.h
@@ -44,6 +44,7 @@ private:
typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncResultHandle> > ResultQueue;
ResultQueue m_resQueue;
+ // Callback function for pollable queue, defined in qpid::sys::PollableQueue
ResultQueue::Batch::const_iterator handle(const ResultQueue::Batch& e);
};
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index 7bb6175862..1c8f4b0737 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -46,6 +46,10 @@ public:
virtual ~BrokerAsyncContext() {}
virtual AsyncResultQueue* getAsyncResultQueue() const = 0;
virtual void invokeCallback(const AsyncResultHandle* const) const = 0;
+ void setOpStr(const char* opStr) { m_opStr = opStr; }
+ const char* getOpStr() const { return m_opStr; }
+private:
+ const char* m_opStr;
};
class DataSource {
@@ -83,6 +87,7 @@ public:
boost::shared_ptr<BrokerAsyncContext>) = 0;
virtual void submitAbort(TxnHandle&,
boost::shared_ptr<BrokerAsyncContext>) = 0;
+ void testOp() const {}
};
// Subclassed by store:
diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp
index c3d4342993..63e2de2b41 100644
--- a/cpp/src/qpid/broker/TxnAsyncContext.cpp
+++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp
@@ -27,13 +27,9 @@ namespace qpid {
namespace broker {
TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb,
- TxnHandle& th,
- const qpid::asyncStore::AsyncOperation::opCode op,
- qpid::broker::AsyncResultCallback rcb,
- qpid::broker::AsyncResultQueue* const arq):
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq):
m_tb(tb),
- m_th(th),
- m_op(op),
m_rcb(rcb),
m_arq(arq)
{}
@@ -47,24 +43,6 @@ TxnAsyncContext::getTxnBuffer() const
return m_tb;
}
-qpid::asyncStore::AsyncOperation::opCode
-TxnAsyncContext::getOpCode() const
-{
- return m_op;
-}
-
-const char*
-TxnAsyncContext::getOpStr() const
-{
- return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
-}
-
-TxnHandle&
-TxnAsyncContext::getTransactionContext() const
-{
- return m_th;
-}
-
AsyncResultQueue*
TxnAsyncContext::getAsyncResultQueue() const
{
diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h
index 810c46429c..0c35b110a8 100644
--- a/cpp/src/qpid/broker/TxnAsyncContext.h
+++ b/cpp/src/qpid/broker/TxnAsyncContext.h
@@ -29,6 +29,9 @@
#include "qpid/asyncStore/AsyncOperation.h"
namespace qpid {
+namespace asyncStore {
+class AsyncOperation;
+}
namespace broker {
class TxnHandle;
@@ -39,15 +42,10 @@ class TxnAsyncContext: public BrokerAsyncContext
{
public:
TxnAsyncContext(TxnBuffer* const tb,
- TxnHandle& th,
- const qpid::asyncStore::AsyncOperation::opCode op,
- qpid::broker::AsyncResultCallback rcb,
- qpid::broker::AsyncResultQueue* const arq);
+ AsyncResultCallback rcb,
+ AsyncResultQueue* const arq);
virtual ~TxnAsyncContext();
TxnBuffer* getTxnBuffer() const;
- qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
- const char* getOpStr() const;
- TxnHandle& getTransactionContext() const;
// --- Interface BrokerAsyncContext ---
AsyncResultQueue* getAsyncResultQueue() const;
@@ -55,8 +53,6 @@ public:
private:
TxnBuffer* const m_tb;
- TxnHandle& m_th;
- const qpid::asyncStore::AsyncOperation::opCode m_op;
AsyncResultCallback m_rcb;
AsyncResultQueue* const m_arq;
};
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp
index 425d725e9e..ba00293452 100644
--- a/cpp/src/qpid/broker/TxnBuffer.cpp
+++ b/cpp/src/qpid/broker/TxnBuffer.cpp
@@ -96,26 +96,6 @@ TxnBuffer::commitLocal(AsyncTransaction* const store)
return false;
}
-// static
-void
-TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh)
-{
- if (arh) {
- boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
- if (arh->getErrNo()) {
- QPID_LOG(error, "TxnBuffer::handleAsyncResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo()
- << " (" << arh->getErrMsg() << ")");
- tac->getTxnBuffer()->asyncLocalAbort();
- } else {
- if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) {
- tac->getTxnBuffer()->asyncLocalAbort();
- } else {
- tac->getTxnBuffer()->asyncLocalCommit();
- }
- }
- }
-}
-
void
TxnBuffer::asyncLocalCommit()
{
@@ -130,10 +110,9 @@ TxnBuffer::asyncLocalCommit()
m_state = COMMIT;
{
boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
- m_txnHandle,
- qpid::asyncStore::AsyncOperation::TXN_COMMIT,
- &handleAsyncResult,
+ &handleAsyncCommitResult,
&m_resultQueue));
+ m_store->testOp();
m_store->submitCommit(m_txnHandle, tac);
}
break;
@@ -147,6 +126,21 @@ TxnBuffer::asyncLocalCommit()
}
}
+//static
+void
+TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ QPID_LOG(error, "TxnBuffer::handleAsyncCommitResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo()
+ << " (" << arh->getErrMsg() << ")");
+ tac->getTxnBuffer()->asyncLocalAbort();
+ } else {
+ tac->getTxnBuffer()->asyncLocalCommit();
+ }
+ }
+}
+
void
TxnBuffer::asyncLocalAbort()
{
@@ -158,9 +152,7 @@ TxnBuffer::asyncLocalAbort()
m_state = ROLLBACK;
{
boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
- m_txnHandle,
- qpid::asyncStore::AsyncOperation::TXN_ABORT,
- &handleAsyncResult,
+ &handleAsyncAbortResult,
&m_resultQueue));
m_store->submitCommit(m_txnHandle, tac);
}
@@ -173,4 +165,17 @@ TxnBuffer::asyncLocalAbort()
}
}
+//static
+void
+TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo()
+ << " (" << arh->getErrMsg() << ")");
+ }
+ tac->getTxnBuffer()->asyncLocalAbort();
+ }
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h
index cd78846b71..bd9743cad7 100644
--- a/cpp/src/qpid/broker/TxnBuffer.h
+++ b/cpp/src/qpid/broker/TxnBuffer.h
@@ -51,9 +51,10 @@ public:
bool commitLocal(AsyncTransaction* const store);
// --- Async operations ---
- static void handleAsyncResult(const AsyncResultHandle* const arh);
void asyncLocalCommit();
+ static void handleAsyncCommitResult(const AsyncResultHandle* const arh);
void asyncLocalAbort();
+ static void handleAsyncAbortResult(const AsyncResultHandle* const arh);
private:
std::vector<boost::shared_ptr<TxnOp> > m_ops;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
index 5bcf3fe401..e3bfe9ae7a 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
@@ -32,10 +32,8 @@ namespace storePerftools {
namespace asyncPerf {
MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
- const qpid::asyncStore::AsyncOperation::opCode op,
boost::shared_ptr<SimpleQueue> q) :
m_msg(msg),
- m_op(op),
m_q(q)
{
assert(m_msg.get() != 0);
@@ -45,18 +43,6 @@ MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg
MessageAsyncContext::~MessageAsyncContext()
{}
-qpid::asyncStore::AsyncOperation::opCode
-MessageAsyncContext::getOpCode() const
-{
- return m_op;
-}
-
-const char*
-MessageAsyncContext::getOpStr() const
-{
- return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
-}
-
boost::intrusive_ptr<SimpleMessage>
MessageAsyncContext::getMessage() const
{
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
index 77d7be286b..9252fbda45 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
@@ -40,18 +40,14 @@ class MessageAsyncContext : public qpid::broker::BrokerAsyncContext
{
public:
MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
- const qpid::asyncStore::AsyncOperation::opCode op,
boost::shared_ptr<SimpleQueue> q);
virtual ~MessageAsyncContext();
- qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
- const char* getOpStr() const;
boost::intrusive_ptr<SimpleMessage> getMessage() const;
boost::shared_ptr<SimpleQueue> getQueue() const;
void destroy();
private:
boost::intrusive_ptr<SimpleMessage> m_msg;
- const qpid::asyncStore::AsyncOperation::opCode m_op;
boost::shared_ptr<SimpleQueue> m_q;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
index f2eea9bad3..0312f61d3c 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
@@ -33,12 +33,10 @@ namespace asyncPerf {
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
qpid::broker::TxnHandle& th,
- const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
qpid::broker::AsyncResultQueue* const arq) :
m_q(q),
m_th(th),
- m_op(op),
m_rcb(rcb),
m_arq(arq)
{
@@ -48,13 +46,11 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
boost::intrusive_ptr<SimpleMessage> msg,
qpid::broker::TxnHandle& th,
- const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
qpid::broker::AsyncResultQueue* const arq) :
m_q(q),
m_msg(msg),
m_th(th),
- m_op(op),
m_rcb(rcb),
m_arq(arq)
{
@@ -65,18 +61,6 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
QueueAsyncContext::~QueueAsyncContext()
{}
-qpid::asyncStore::AsyncOperation::opCode
-QueueAsyncContext::getOpCode() const
-{
- return m_op;
-}
-
-const char*
-QueueAsyncContext::getOpStr() const
-{
- return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
-}
-
boost::shared_ptr<SimpleQueue>
QueueAsyncContext::getQueue() const
{
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
index e3e87b8ad8..4e3d9fe2db 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
@@ -49,18 +49,14 @@ class QueueAsyncContext: public qpid::broker::BrokerAsyncContext
public:
QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
qpid::broker::TxnHandle& th,
- const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
qpid::broker::AsyncResultQueue* const arq);
QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
boost::intrusive_ptr<SimpleMessage> msg,
qpid::broker::TxnHandle& th,
- const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
qpid::broker::AsyncResultQueue* const arq);
virtual ~QueueAsyncContext();
- qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
- const char* getOpStr() const;
boost::shared_ptr<SimpleQueue> getQueue() const;
boost::intrusive_ptr<SimpleMessage> getMessage() const;
qpid::broker::TxnHandle getTxnHandle() const;
@@ -72,8 +68,7 @@ public:
private:
boost::shared_ptr<SimpleQueue> m_q;
boost::intrusive_ptr<SimpleMessage> m_msg;
- qpid::broker::TxnHandle m_th;
- const qpid::asyncStore::AsyncOperation::opCode m_op;
+ qpid::broker::TxnHandle m_th; // TODO: get rid of this
qpid::broker::AsyncResultCallback m_rcb;
qpid::broker::AsyncResultQueue* const m_arq;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index 2a6f2b208b..79b8b46919 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -33,7 +33,6 @@
#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/TxnHandle.h"
-#include <boost/make_shared.hpp>
#include <string.h> // memcpy()
namespace tests {
@@ -69,43 +68,6 @@ SimpleQueue::SimpleQueue(const std::string& name,
SimpleQueue::~SimpleQueue()
{}
-// static
-void
-SimpleQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh)
-{
- if (arh) {
- boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
- if (arh->getErrNo()) {
- // TODO: Handle async failure here (other than by simply printing a message)
- std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
- << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
- } else {
- // Handle async success here
- switch(qc->getOpCode()) {
- case qpid::asyncStore::AsyncOperation::QUEUE_CREATE:
- qc->getQueue()->createComplete(qc);
- break;
- case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH:
- qc->getQueue()->flushComplete(qc);
- break;
- case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY:
- qc->getQueue()->destroyComplete(qc);
- break;
- case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE:
- qc->getQueue()->enqueueComplete(qc);
- break;
- case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE:
- qc->getQueue()->dequeueComplete(qc);
- break;
- default:
- std::ostringstream oss;
- oss << "tests::storePerftools::asyncPerf::SimpleQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode();
- throw qpid::Exception(oss.str());
- };
- }
- }
-}
-
const qpid::broker::QueueHandle&
SimpleQueue::getHandle() const
{
@@ -130,16 +92,28 @@ SimpleQueue::asyncCreate()
if (m_store) {
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
s_nullTxnHandle,
- qpid::asyncStore::AsyncOperation::QUEUE_CREATE,
- &handleAsyncResult,
+ &handleAsyncCreateResult,
&m_resultQueue));
- m_store->submitCreate(m_queueHandle,
- this,
- qac);
+ m_store->submitCreate(m_queueHandle, this, qac);
++m_asyncOpCounter;
}
}
+//static
+void
+SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ qc->getQueue()->createComplete(qc);
+ }
+ }
+}
+
void
SimpleQueue::asyncDestroy(const bool deleteQueue)
{
@@ -148,25 +122,38 @@ SimpleQueue::asyncDestroy(const bool deleteQueue)
if (deleteQueue) {
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
s_nullTxnHandle,
- qpid::asyncStore::AsyncOperation::QUEUE_DESTROY,
- &handleAsyncResult,
+ &handleAsyncDestroyResult,
&m_resultQueue));
- m_store->submitDestroy(m_queueHandle,
- qac);
+ m_store->submitDestroy(m_queueHandle, qac);
++m_asyncOpCounter;
}
m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000));
}
}
+//static
+void
+SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ qc->getQueue()->destroyComplete(qc);
+ }
+ }
+}
+
void
SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
{
boost::shared_ptr<QueuedMessage> qm;
if (msg->isPersistent() && m_store) {
- qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
+ qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
} else {
- qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg));
+ qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
}
enqueue(s_nullTxnHandle, qm);
push(qm);
@@ -231,9 +218,9 @@ SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
{
boost::shared_ptr<QueuedMessage> qm;
if (msg->isPersistent() && m_store) {
- qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
+ qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
} else {
- qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg));
+ qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
}
push(qm);
}
@@ -357,9 +344,6 @@ void
SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
bool /*isRecovery*/)
{
-boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
-assert(pqm.get());
-
m_messages->push(qm);
}
@@ -375,8 +359,7 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
pqm->payload(),
th,
- qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
- &handleAsyncResult,
+ &handleAsyncEnqueueResult,
&m_resultQueue));
// TODO : This must be done from inside store, not here
if (th.isValid()) {
@@ -389,6 +372,21 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
return true;
}
+// private static
+void
+SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ qc->getQueue()->enqueueComplete(qc);
+ }
+ }
+}
+
// private
bool
SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
@@ -398,8 +396,7 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
pqm->payload(),
th,
- qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
- &handleAsyncResult,
+ &handleAsyncDequeueResult,
&m_resultQueue));
// TODO : This must be done from inside store, not here
if (th.isValid()) {
@@ -411,6 +408,20 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
++m_asyncOpCounter;
return true;
}
+// private static
+void
+SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ qc->getQueue()->dequeueComplete(qc);
+ }
+ }
+}
// private
void
@@ -455,9 +466,8 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
- qpid::broker::TxnHandle th = qc->getTxnHandle();
-
// TODO : This must be done from inside store, not here
+ qpid::broker::TxnHandle th = qc->getTxnHandle();
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
}
@@ -470,9 +480,8 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
- qpid::broker::TxnHandle th = qc->getTxnHandle();
-
// TODO : This must be done from inside store, not here
+ qpid::broker::TxnHandle th = qc->getTxnHandle();
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
index 2763ae3159..f13febbafa 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
@@ -35,9 +35,6 @@
#include <boost/enable_shared_from_this.hpp>
namespace qpid {
-namespace asyncStore {
-//class AsyncStoreImpl;
-}
namespace broker {
class AsyncResultQueue;
}
@@ -67,13 +64,14 @@ public:
qpid::broker::AsyncResultQueue& arq);
virtual ~SimpleQueue();
- static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res);
const qpid::broker::QueueHandle& getHandle() const;
qpid::broker::QueueHandle& getHandle();
qpid::broker::AsyncStore* getStore();
void asyncCreate();
+ static void handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh);
void asyncDestroy(const bool deleteQueue);
+ static void handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh);
// --- Methods in msg handling path from qpid::Queue ---
void deliver(boost::intrusive_ptr<SimpleMessage> msg);
@@ -98,7 +96,7 @@ public:
virtual const std::string& getName() const;
virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst);
- // --- Interface DataStore ---
+ // --- Interface qpid::broker::DataStore ---
virtual uint64_t getSize();
virtual void write(char* target);
@@ -116,8 +114,7 @@ private:
bool m_destroyed;
// --- Members & methods in msg handling path copied from qpid::Queue ---
- struct UsageBarrier
- {
+ struct UsageBarrier {
SimpleQueue& m_parent;
uint32_t m_count;
qpid::sys::Monitor m_monitor;
@@ -126,8 +123,7 @@ private:
void release();
void destroy();
};
- struct ScopedUse
- {
+ struct ScopedUse {
UsageBarrier& m_barrier;
const bool m_acquired;
ScopedUse(UsageBarrier& b);
@@ -141,8 +137,10 @@ private:
// -- Async ops ---
bool asyncEnqueue(qpid::broker::TxnHandle& th,
boost::shared_ptr<PersistableQueuedMessage> pqm);
+ static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh);
bool asyncDequeue(qpid::broker::TxnHandle& th,
boost::shared_ptr<PersistableQueuedMessage> pqm);
+ static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh);
// --- Async op counter ---
void destroyCheck(const std::string& opDescr) const;