summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.cpp94
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.h71
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp24
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp10
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h1
-rw-r--r--cpp/src/qpid/broker/SimpleTxnBuffer.cpp1
6 files changed, 110 insertions, 91 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
index 3a53b4df1b..1a67a15d6f 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
@@ -31,8 +31,10 @@
namespace qpid {
namespace asyncStore {
-AsyncOperation::AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- m_brokerCtxt(brokerCtxt)
+AsyncOperation::AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store) :
+ m_brokerCtxt(brokerCtxt),
+ m_store(store)
{}
AsyncOperation::~AsyncOperation() {}
@@ -42,13 +44,13 @@ boost::shared_ptr<qpid::broker::BrokerAsyncContext> AsyncOperation::getBrokerCon
}
void
-AsyncOperation::submitResult() {
+AsyncOperation::submitResult() const {
return submitResult(0, "");
}
void
AsyncOperation::submitResult(const int errNo,
- const std::string& errMsg) {
+ const std::string& errMsg) const {
if (m_brokerCtxt.get()) {
qpid::broker::AsyncResultQueue* const arq = m_brokerCtxt->getAsyncResultQueue();
if (arq) {
@@ -63,15 +65,16 @@ AsyncOperation::submitResult(const int errNo,
// --- class AsyncOpTxnPrepare ---
AsyncOpTxnPrepare::AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt) :
- AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt)),
+ boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt), store),
m_txnHandle(txnHandle)
{}
AsyncOpTxnPrepare::~AsyncOpTxnPrepare() {}
void
-AsyncOpTxnPrepare::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+AsyncOpTxnPrepare::executeOp() const {
// TODO: Implement store operation here
submitResult();
}
@@ -86,15 +89,16 @@ AsyncOpTxnPrepare::getOpStr() const {
// --- class AsyncOpTxnCommit ---
AsyncOpTxnCommit::AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) :
- AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt)),
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt), store),
m_txnHandle(txnHandle)
{}
AsyncOpTxnCommit::~AsyncOpTxnCommit() {}
void
-AsyncOpTxnCommit::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+AsyncOpTxnCommit::executeOp() const {
// TODO: Implement store operation here
submitResult();
}
@@ -108,15 +112,16 @@ AsyncOpTxnCommit::getOpStr() const {
// --- class AsyncOpTxnAbort ---
AsyncOpTxnAbort::AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt) :
- AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt)),
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(txnCtxt), store),
m_txnHandle(txnHandle)
{}
AsyncOpTxnAbort::~AsyncOpTxnAbort() {}
void
-AsyncOpTxnAbort::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+AsyncOpTxnAbort::executeOp() const {
// TODO: Implement store operation here
submitResult();
}
@@ -131,8 +136,9 @@ AsyncOpTxnAbort::getOpStr() const {
AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
const qpid::broker::DataSource* const data,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- AsyncOperation(brokerCtxt),
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(brokerCtxt, store),
m_cfgHandle(cfgHandle),
m_data(data)
{}
@@ -140,7 +146,7 @@ AsyncOpConfigCreate::AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
AsyncOpConfigCreate::~AsyncOpConfigCreate() {}
void
-AsyncOpConfigCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+AsyncOpConfigCreate::executeOp() const {
// TODO: Implement store operation here
submitResult();
}
@@ -154,15 +160,16 @@ AsyncOpConfigCreate::getOpStr() const {
// --- class AsyncOpConfigDestroy ---
AsyncOpConfigDestroy::AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- AsyncOperation(brokerCtxt),
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(brokerCtxt, store),
m_cfgHandle(cfgHandle)
{}
AsyncOpConfigDestroy::~AsyncOpConfigDestroy() {}
void
-AsyncOpConfigDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+AsyncOpConfigDestroy::executeOp() const {
// TODO: Implement store operation here
submitResult();
}
@@ -177,8 +184,9 @@ AsyncOpConfigDestroy::getOpStr() const {
AsyncOpQueueCreate::AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle,
const qpid::broker::DataSource* const data,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) :
- AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt)),
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt), store),
m_queueHandle(queueHandle),
m_data(data)
{}
@@ -186,7 +194,7 @@ AsyncOpQueueCreate::AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle,
AsyncOpQueueCreate::~AsyncOpQueueCreate() {}
void
-AsyncOpQueueCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+AsyncOpQueueCreate::executeOp() const {
// TODO: Implement store operation here
submitResult();
}
@@ -200,15 +208,16 @@ AsyncOpQueueCreate::getOpStr() const {
// --- class AsyncOpQueueFlush ---
AsyncOpQueueFlush::AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) :
- AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt)),
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt), store),
m_queueHandle(queueHandle)
{}
AsyncOpQueueFlush::~AsyncOpQueueFlush() {}
void
-AsyncOpQueueFlush::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+AsyncOpQueueFlush::executeOp() const {
// TODO: Implement store operation here
submitResult();
}
@@ -222,15 +231,16 @@ AsyncOpQueueFlush::getOpStr() const {
// --- class AsyncOpQueueDestroy ---
AsyncOpQueueDestroy::AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt) :
- AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt)),
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(boost::dynamic_pointer_cast<qpid::broker::BrokerAsyncContext>(queueCtxt), store),
m_queueHandle(queueHandle)
{}
AsyncOpQueueDestroy::~AsyncOpQueueDestroy() {}
void
-AsyncOpQueueDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+AsyncOpQueueDestroy::executeOp() const {
// TODO: Implement store operation here
submitResult();
}
@@ -246,8 +256,9 @@ AsyncOpQueueDestroy::getOpStr() const {
AsyncOpEventCreate::AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle,
const qpid::broker::DataSource* const data,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- AsyncOperation(brokerCtxt),
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(brokerCtxt, store),
m_evtHandle(evtHandle),
m_data(data),
m_txnHandle(txnHandle)
@@ -256,7 +267,7 @@ AsyncOpEventCreate::AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle,
AsyncOpEventCreate::~AsyncOpEventCreate() {}
void
-AsyncOpEventCreate::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+AsyncOpEventCreate::executeOp() const {
// TODO: Implement store operation here
submitResult();
}
@@ -271,8 +282,9 @@ AsyncOpEventCreate::getOpStr() const {
AsyncOpEventDestroy::AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- AsyncOperation(brokerCtxt),
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(brokerCtxt, store),
m_evtHandle(evtHandle),
m_txnHandle(txnHandle)
{}
@@ -280,7 +292,7 @@ AsyncOpEventDestroy::AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle,
AsyncOpEventDestroy::~AsyncOpEventDestroy() {}
void
-AsyncOpEventDestroy::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+AsyncOpEventDestroy::executeOp() const {
// TODO: Implement store operation here
submitResult();
}
@@ -295,15 +307,16 @@ AsyncOpEventDestroy::getOpStr() const {
AsyncOpMsgEnqueue::AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- AsyncOperation(brokerCtxt),
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(brokerCtxt, store),
m_enqHandle(enqHandle),
m_txnHandle(txnHandle)
{}
AsyncOpMsgEnqueue::~AsyncOpMsgEnqueue() {}
-void AsyncOpMsgEnqueue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+void AsyncOpMsgEnqueue::executeOp() const {
// TODO: Implement store operation here
submitResult();
}
@@ -317,15 +330,16 @@ const char* AsyncOpMsgEnqueue::getOpStr() const {
AsyncOpMsgDequeue::AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
- AsyncOperation(brokerCtxt),
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store) :
+ AsyncOperation(brokerCtxt, store),
m_enqHandle(enqHandle),
m_txnHandle(txnHandle)
{}
AsyncOpMsgDequeue::~AsyncOpMsgDequeue() {}
-void AsyncOpMsgDequeue::executeOp(boost::shared_ptr<AsyncStoreImpl> /*store*/) {
+void AsyncOpMsgDequeue::executeOp() const {
// TODO: Implement store operation here
submitResult();
}
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h
index 2894816ca4..f22cccad07 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.h
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.h
@@ -34,26 +34,30 @@ class AsyncStoreImpl;
class AsyncOperation {
public:
- AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ AsyncOperation(boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store);
virtual ~AsyncOperation();
- virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store) = 0;
+ virtual void executeOp() const = 0;
boost::shared_ptr<qpid::broker::BrokerAsyncContext> getBrokerContext() const;
virtual const char* getOpStr() const = 0;
protected:
- void submitResult();
+ void submitResult() const;
void submitResult(const int errNo,
- const std::string& errMsg);
+ const std::string& errMsg) const;
private:
boost::shared_ptr<qpid::broker::BrokerAsyncContext> const m_brokerCtxt;
+protected:
+ qpid::broker::AsyncStore* m_store;
};
class AsyncOpTxnPrepare: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpTxnPrepare(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt);
+ boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> txnCtxt,
+ qpid::broker::AsyncStore* store);
virtual ~AsyncOpTxnPrepare();
- virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual void executeOp() const;
virtual const char* getOpStr() const;
private:
qpid::broker::TxnHandle& m_txnHandle;
@@ -63,9 +67,10 @@ private:
class AsyncOpTxnCommit: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpTxnCommit(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt);
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt,
+ qpid::broker::AsyncStore* store);
virtual ~AsyncOpTxnCommit();
- virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual void executeOp() const;
virtual const char* getOpStr() const;
private:
qpid::broker::TxnHandle& m_txnHandle;
@@ -75,9 +80,10 @@ private:
class AsyncOpTxnAbort: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpTxnAbort(qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt);
+ boost::shared_ptr<qpid::broker::TxnAsyncContext> txnCtxt,
+ qpid::broker::AsyncStore* store);
virtual ~AsyncOpTxnAbort();
- virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual void executeOp() const;
virtual const char* getOpStr() const;
private:
qpid::broker::TxnHandle& m_txnHandle;
@@ -88,9 +94,10 @@ class AsyncOpConfigCreate: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpConfigCreate(qpid::broker::ConfigHandle& cfgHandle,
const qpid::broker::DataSource* const data,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store);
virtual ~AsyncOpConfigCreate();
- virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual void executeOp() const;
virtual const char* getOpStr() const;
private:
qpid::broker::ConfigHandle& m_cfgHandle;
@@ -101,9 +108,10 @@ private:
class AsyncOpConfigDestroy: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpConfigDestroy(qpid::broker::ConfigHandle& cfgHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store);
virtual ~AsyncOpConfigDestroy();
- virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual void executeOp() const;
virtual const char* getOpStr() const;
private:
qpid::broker::ConfigHandle& m_cfgHandle;
@@ -114,9 +122,10 @@ class AsyncOpQueueCreate: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpQueueCreate(qpid::broker::QueueHandle& queueHandle,
const qpid::broker::DataSource* const data,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt);
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt,
+ qpid::broker::AsyncStore* store);
virtual ~AsyncOpQueueCreate();
- virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual void executeOp() const;
virtual const char* getOpStr() const;
private:
qpid::broker::QueueHandle& m_queueHandle;
@@ -127,9 +136,10 @@ private:
class AsyncOpQueueFlush: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpQueueFlush(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt);
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt,
+ qpid::broker::AsyncStore* store);
virtual ~AsyncOpQueueFlush();
- virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual void executeOp() const;
virtual const char* getOpStr() const;
private:
qpid::broker::QueueHandle& m_queueHandle;
@@ -139,9 +149,10 @@ private:
class AsyncOpQueueDestroy: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpQueueDestroy(qpid::broker::QueueHandle& queueHandle,
- boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt);
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> queueCtxt,
+ qpid::broker::AsyncStore* store);
virtual ~AsyncOpQueueDestroy();
- virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual void executeOp() const;
virtual const char* getOpStr() const;
private:
qpid::broker::QueueHandle& m_queueHandle;
@@ -153,9 +164,10 @@ public:
AsyncOpEventCreate(qpid::broker::EventHandle& evtHandle,
const qpid::broker::DataSource* const data,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store);
virtual ~AsyncOpEventCreate();
- virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual void executeOp() const;
virtual const char* getOpStr() const;
private:
qpid::broker::EventHandle& m_evtHandle;
@@ -168,9 +180,10 @@ class AsyncOpEventDestroy: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpEventDestroy(qpid::broker::EventHandle& evtHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store);
virtual ~AsyncOpEventDestroy();
- virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual void executeOp() const;
virtual const char* getOpStr() const;
private:
qpid::broker::EventHandle& m_evtHandle;
@@ -182,9 +195,10 @@ class AsyncOpMsgEnqueue: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpMsgEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store);
virtual ~AsyncOpMsgEnqueue();
- virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual void executeOp() const;
virtual const char* getOpStr() const;
private:
qpid::broker::EnqueueHandle& m_enqHandle;
@@ -196,9 +210,10 @@ class AsyncOpMsgDequeue: public qpid::asyncStore::AsyncOperation {
public:
AsyncOpMsgDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt,
+ qpid::broker::AsyncStore* store);
virtual ~AsyncOpMsgDequeue();
- virtual void executeOp(boost::shared_ptr<AsyncStoreImpl> store);
+ virtual void executeOp() const;
virtual const char* getOpStr() const;
private:
qpid::broker::EnqueueHandle& m_enqHandle;
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index 5c62782278..038f4b4238 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -90,7 +90,7 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid,
void
AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, TxnCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnPrepare(txnHandle, TxnCtxt, this));
TxnCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -98,7 +98,7 @@ AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
void
AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, TxnCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnCommit(txnHandle, TxnCtxt, this));
TxnCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -106,7 +106,7 @@ AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
void
AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::TxnAsyncContext> TxnCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, TxnCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpTxnAbort(txnHandle, TxnCtxt, this));
TxnCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -145,7 +145,7 @@ void
AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigCreate(cfgHandle, dataSrc, brokerCtxt, this));
brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -153,7 +153,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle,
void
AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpConfigDestroy(cfgHandle, brokerCtxt, this));
brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -162,7 +162,7 @@ void
AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle,
const qpid::broker::DataSource* const dataSrc,
boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, QueueCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueCreate(queueHandle, dataSrc, QueueCtxt, this));
QueueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -170,7 +170,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle,
void
AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle,
boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, QueueCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueDestroy(queueHandle, QueueCtxt, this));
QueueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -178,7 +178,7 @@ AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle,
void
AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle,
boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, QueueCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpQueueFlush(queueHandle, QueueCtxt, this));
QueueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -188,7 +188,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
const qpid::broker::DataSource* const dataSrc,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventCreate(eventHandle, dataSrc, txnHandle, brokerCtxt, this));
brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -197,7 +197,7 @@ void
AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpEventDestroy(eventHandle, txnHandle, brokerCtxt, this));
brokerCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -206,7 +206,7 @@ void
AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, QueueCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgEnqueue(enqHandle, txnHandle, QueueCtxt, this));
QueueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
@@ -215,7 +215,7 @@ void
AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::QueueAsyncContext> QueueCtxt) {
- boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, QueueCtxt));
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOpMsgDequeue(enqHandle, txnHandle, QueueCtxt, this));
QueueCtxt->setOpStr(op->getOpStr());
m_operations.submit(op);
}
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index 09fa8c7048..51dcd7392a 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -50,15 +50,7 @@ OperationQueue::OpQueue::Batch::const_iterator
OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) {
try {
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
- boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->getBrokerContext();
- if (bc.get()) {
- qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue();
- if (arq) {
- qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc);
- boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi));
- arq->submit(arh);
- }
- }
+ (*i)->executeOp(); // Do store work here
}
} catch (const std::exception& e) {
QPID_LOG(error, "qpid::asyncStore::OperationQueue: Exception thrown processing async op: " << e.what());
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index 7009565a7c..e274a4e196 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -90,7 +90,6 @@ public:
boost::shared_ptr<TxnAsyncContext>) = 0;
virtual void submitAbort(TxnHandle&,
boost::shared_ptr<TxnAsyncContext>) = 0;
- void testOp() const {}
};
// Subclassed by store:
diff --git a/cpp/src/qpid/broker/SimpleTxnBuffer.cpp b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp
index cb9c54d2d4..7995eae874 100644
--- a/cpp/src/qpid/broker/SimpleTxnBuffer.cpp
+++ b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp
@@ -164,7 +164,6 @@ SimpleTxnBuffer::asyncLocalCommit() {
boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
&handleAsyncCommitResult,
&m_resultQueue));
- m_store->testOp();
m_store->submitCommit(m_txnHandle, tac);
break;
}