summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-06-08 14:59:04 +0000
committerKim van der Riet <kpvdr@apache.org>2012-06-08 14:59:04 +0000
commit56094d9861141097e107a71dac4c808e0aca9c5f (patch)
tree4a62659c9d6c7dc47549b486483c1afa45183a9a
parent22d453646b4815752134ad62e0b27841a103afb2 (diff)
downloadqpid-python-56094d9861141097e107a71dac4c808e0aca9c5f.tar.gz
QPID-3858: WIP - completed async return path
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1348098 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/CMakeLists.txt2
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.cpp24
-rw-r--r--cpp/src/qpid/asyncStore/AsyncOperation.h21
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp199
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h75
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp29
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.h8
-rw-r--r--cpp/src/qpid/asyncStore/Plugin.cpp2
-rw-r--r--cpp/src/qpid/broker/AsyncResultHandle.cpp8
-rw-r--r--cpp/src/qpid/broker/AsyncResultHandle.h5
-rw-r--r--cpp/src/qpid/broker/AsyncResultHandleImpl.cpp11
-rw-r--r--cpp/src/qpid/broker/AsyncResultHandleImpl.h10
-rw-r--r--cpp/src/qpid/broker/AsyncResultQueue.cpp62
-rw-r--r--cpp/src/qpid/broker/AsyncResultQueueImpl.cpp62
-rw-r--r--cpp/src/qpid/broker/AsyncResultQueueImpl.h (renamed from cpp/src/qpid/broker/AsyncResultQueue.h)21
-rw-r--r--cpp/src/qpid/broker/AsyncStore.cpp25
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h86
-rw-r--r--cpp/src/qpid/broker/ConfigHandle.h2
-rw-r--r--cpp/src/qpid/broker/EnqueueHandle.h2
-rw-r--r--cpp/src/qpid/broker/EventHandle.h2
-rw-r--r--cpp/src/qpid/broker/MessageHandle.h2
-rw-r--r--cpp/src/qpid/broker/QueueHandle.h2
-rw-r--r--cpp/src/qpid/broker/TxnHandle.h2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp76
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h15
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp3
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.h4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp34
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h15
30 files changed, 375 insertions, 436 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index b5ac6af825..575c78e320 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -1078,7 +1078,7 @@ set (qpidbroker_SOURCES
qpid/amqp_0_10/Connection.cpp
qpid/broker/AsyncResultHandle.cpp
qpid/broker/AsyncResultHandleImpl.cpp
- qpid/broker/AsyncResultQueue.cpp
+ qpid/broker/AsyncResultQueueImpl.cpp
qpid/broker/AsyncStore.cpp
qpid/broker/Broker.cpp
qpid/broker/Credit.cpp
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
index 19b15b12d0..0d8ff1535e 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp
@@ -34,60 +34,50 @@ AsyncOperation::AsyncOperation() :
m_op(NONE),
m_targetHandle(),
m_dataSrc(0),
- m_txnHandle(0),
- m_resCb(0),
- m_brokerCtxt(0)
+ m_txnHandle(0)
{}
AsyncOperation::AsyncOperation(const opCode op,
const qpid::broker::IdHandle* th,
- const qpid::broker::ResultCallback resCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt) :
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
m_op(op),
m_targetHandle(th),
m_dataSrc(0),
m_txnHandle(0),
- m_resCb(resCb),
m_brokerCtxt(brokerCtxt)
{}
AsyncOperation::AsyncOperation(const opCode op,
const qpid::broker::IdHandle* th,
- const qpid::broker::DataSource* dataSrc,
- const qpid::broker::ResultCallback resCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt) :
+ const qpid::broker::DataSource* const dataSrc,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
m_op(op),
m_targetHandle(th),
m_dataSrc(dataSrc),
m_txnHandle(0),
- m_resCb(resCb),
m_brokerCtxt(brokerCtxt)
{}
AsyncOperation::AsyncOperation(const opCode op,
const qpid::broker::IdHandle* th,
const qpid::broker::TxnHandle* txnHandle,
- const qpid::broker::ResultCallback resCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt) :
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
m_op(op),
m_targetHandle(th),
m_dataSrc(0),
m_txnHandle(txnHandle),
- m_resCb(resCb),
m_brokerCtxt(brokerCtxt)
{}
AsyncOperation::AsyncOperation(const opCode op,
const qpid::broker::IdHandle* th,
- const qpid::broker::DataSource* dataSrc,
+ const qpid::broker::DataSource* const dataSrc,
const qpid::broker::TxnHandle* txnHandle,
- const qpid::broker::ResultCallback resCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt) :
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) :
m_op(op),
m_targetHandle(th),
m_dataSrc(dataSrc),
m_txnHandle(txnHandle),
- m_resCb(resCb),
m_brokerCtxt(brokerCtxt)
{}
diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h
index 998d03955e..24f8baeb4d 100644
--- a/cpp/src/qpid/asyncStore/AsyncOperation.h
+++ b/cpp/src/qpid/asyncStore/AsyncOperation.h
@@ -50,34 +50,29 @@ public:
AsyncOperation();
AsyncOperation(const opCode op,
const qpid::broker::IdHandle* th,
- const qpid::broker::ResultCallback resCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
AsyncOperation(const opCode op,
const qpid::broker::IdHandle* th,
- const qpid::broker::DataSource* dataSrc,
- const qpid::broker::ResultCallback resCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ const qpid::broker::DataSource* const dataSrc,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
AsyncOperation(const opCode op,
const qpid::broker::IdHandle* th,
const qpid::broker::TxnHandle* txnHandle,
- const qpid::broker::ResultCallback resCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
AsyncOperation(const opCode op,
const qpid::broker::IdHandle* th,
- const qpid::broker::DataSource* dataSrc,
+ const qpid::broker::DataSource* const dataSrc,
const qpid::broker::TxnHandle* txnHandle,
- const qpid::broker::ResultCallback resCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
virtual ~AsyncOperation();
const char* getOpStr() const;
static const char* getOpStr(const opCode op);
opCode m_op;
const qpid::broker::IdHandle* m_targetHandle;
- const qpid::broker::DataSource* m_dataSrc;
+ const qpid::broker::DataSource* const m_dataSrc;
const qpid::broker::TxnHandle* m_txnHandle;
- const qpid::broker::ResultCallback m_resCb;
- qpid::broker::BrokerAsyncContext* m_brokerCtxt;
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> const m_brokerCtxt;
};
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index 6283d07ee9..a9fc13363a 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -38,12 +38,11 @@ namespace qpid {
namespace asyncStore {
AsyncStoreImpl::AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
- const AsyncStoreOptions& opts,
- qpid::broker::AsyncResultQueue* resultQueue) :
+ const AsyncStoreOptions& opts) :
m_poller(poller),
m_opts(opts),
m_runState(),
- m_operations(m_poller, resultQueue)
+ m_operations(m_poller)
{}
AsyncStoreImpl::~AsyncStoreImpl()
@@ -63,23 +62,17 @@ void
AsyncStoreImpl::initManagement(qpid::broker::Broker* /*broker*/)
{}
-qpid::broker::TxnHandle
-AsyncStoreImpl::createTxnHandle(const std::string& xid)
-{
- return qpid::broker::TxnHandle(new TxnHandleImpl(xid));
-}
-
qpid::broker::ConfigHandle
AsyncStoreImpl::createConfigHandle()
{
return qpid::broker::ConfigHandle(new ConfigHandleImpl());
}
-qpid::broker::QueueHandle
-AsyncStoreImpl::createQueueHandle(const std::string& name,
- const qpid::types::Variant::Map& opts)
+qpid::broker::EnqueueHandle
+AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle,
+ qpid::broker::QueueHandle& queueHandle)
{
- return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts));
+ return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle));
}
qpid::broker::EventHandle
@@ -90,202 +83,178 @@ AsyncStoreImpl::createEventHandle(qpid::broker::QueueHandle& queueHandle,
}
qpid::broker::MessageHandle
-AsyncStoreImpl::createMessageHandle(const qpid::broker::DataSource* dataSrc)
+AsyncStoreImpl::createMessageHandle(const qpid::broker::DataSource* const dataSrc)
{
return qpid::broker::MessageHandle(new MessageHandleImpl(dataSrc));
}
-qpid::broker::EnqueueHandle
-AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle,
- qpid::broker::QueueHandle& queueHandle)
+qpid::broker::QueueHandle
+AsyncStoreImpl::createQueueHandle(const std::string& name,
+ const qpid::types::Variant::Map& opts)
{
- return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle));
+ return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts));
+}
+
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle(const std::string& xid)
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl(xid));
}
void
AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_PREPARE,
- dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE,
+ dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_COMMIT,
- dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT,
+ dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_ABORT,
- dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT,
+ dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle,
- const qpid::broker::DataSource* dataSrc,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ const qpid::broker::DataSource* const dataSrc,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::CONFIG_CREATE,
- dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle),
- dataSrc,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE,
+ dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle),
+ dataSrc,
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::CONFIG_DESTROY,
- dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle),
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY,
+ dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle),
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle,
- const qpid::broker::DataSource* dataSrc,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ const qpid::broker::DataSource* const dataSrc,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_CREATE,
- dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
- dataSrc,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE,
+ dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
+ dataSrc,
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_DESTROY,
- dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY,
+ dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_FLUSH,
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH,
dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
- resultCb,
- brokerCtxt);
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
- const qpid::broker::DataSource* dataSrc,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ const qpid::broker::DataSource* const dataSrc,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_CREATE,
- dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
- dataSrc,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE,
+ dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ dataSrc,
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
- const qpid::broker::DataSource* dataSrc,
+ const qpid::broker::DataSource* const dataSrc,
qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_CREATE,
- dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
- dataSrc,
- &txnHandle,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE,
+ dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ dataSrc,
+ &txnHandle,
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY,
- dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY,
+ dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY,
- dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
- &txnHandle,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY,
+ dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ &txnHandle,
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE,
- dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
- &txnHandle,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE,
+ dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
+ &txnHandle,
+ brokerCtxt));
m_operations.submit(op);
-//delete op;
-//delete brokerCtxt;
}
void
AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE,
- dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
- &txnHandle,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE,
+ dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
+ &txnHandle,
+ brokerCtxt));
m_operations.submit(op);
}
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
index 717723eda3..60365e0e8a 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -43,82 +43,71 @@ namespace asyncStore {
class AsyncStoreImpl: public qpid::broker::AsyncStore {
public:
AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
- const AsyncStoreOptions& opts,
- qpid::broker::AsyncResultQueue* resultQueue);
+ const AsyncStoreOptions& opts);
virtual ~AsyncStoreImpl();
void initialize();
- uint64_t getNextRid();
+ uint64_t getNextRid(); // Global counter for journal RIDs
- // Management
+ // --- Management ---
void initManagement(qpid::broker::Broker* broker);
- // AsyncStore interface
+ // --- Factory methods for creating handles ---
+
- qpid::broker::TxnHandle createTxnHandle(const std::string& xid=std::string());
qpid::broker::ConfigHandle createConfigHandle();
- qpid::broker::QueueHandle createQueueHandle(const std::string& name,
- const qpid::types::Variant::Map& opts);
- qpid::broker::EventHandle createEventHandle(qpid::broker::QueueHandle& queueHandle,
- const std::string& key=std::string());
- qpid::broker::MessageHandle createMessageHandle(const qpid::broker::DataSource* dataSrc);
qpid::broker::EnqueueHandle createEnqueueHandle(qpid::broker::MessageHandle& msgHandle,
qpid::broker::QueueHandle& queueHandle);
+ qpid::broker::EventHandle createEventHandle(qpid::broker::QueueHandle& queueHandle,
+ const std::string& key=std::string());
+ qpid::broker::MessageHandle createMessageHandle(const qpid::broker::DataSource* const dataSrc);
+ qpid::broker::QueueHandle createQueueHandle(const std::string& name,
+ const qpid::types::Variant::Map& opts);
+ qpid::broker::TxnHandle createTxnHandle(const std::string& xid=std::string());
+
+
+ // --- Store async interface ---
void submitPrepare(qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitCommit(qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitAbort(qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitCreate(qpid::broker::ConfigHandle& cfgHandle,
- const qpid::broker::DataSource* dataSrc,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ const qpid::broker::DataSource* const dataSrc,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitCreate(qpid::broker::QueueHandle& queueHandle,
- const qpid::broker::DataSource* dataSrc,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ const qpid::broker::DataSource* const dataSrc,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitDestroy(qpid::broker::QueueHandle& queueHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitFlush(qpid::broker::QueueHandle& queueHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitCreate(qpid::broker::EventHandle& eventHandle,
- const qpid::broker::DataSource* dataSrc,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ const qpid::broker::DataSource* const dataSrc,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitCreate(qpid::broker::EventHandle& eventHandle,
- const qpid::broker::DataSource* dataSrc,
+ const qpid::broker::DataSource* const dataSrc,
qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitDestroy(qpid::broker::EventHandle& eventHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitDestroy(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
void submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt);
// Legacy - Restore FTD message, is NOT async!
virtual int loadContent(qpid::broker::MessageHandle& msgHandle,
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index f13114f41e..a455e445ab 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -28,10 +28,8 @@
namespace qpid {
namespace asyncStore {
-OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller,
- qpid::broker::AsyncResultQueue* resultQueue) :
- m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller),
- m_resultQueue(resultQueue)
+OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) :
+ m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller)
{
m_opQueue.start();
}
@@ -42,9 +40,9 @@ OperationQueue::~OperationQueue()
}
void
-OperationQueue::submit(const AsyncOperation* op)
+OperationQueue::submit(boost::shared_ptr<const AsyncOperation> op)
{
-std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
+//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
m_opQueue.push(op);
}
@@ -53,19 +51,16 @@ OperationQueue::OpQueue::Batch::const_iterator
OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
{
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
-std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
- qpid::broker::BrokerAsyncContext* bc = (*i)->m_brokerCtxt;
- qpid::broker::ResultCallback rcb = (*i)->m_resCb;
- if (rcb) {
-// ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt);
-// rcb(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc)));
- if (m_resultQueue) {
- (m_resultQueue->*rcb)(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc)));
+//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->m_brokerCtxt;
+ if (bc) {
+ qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue();
+ if (arq) {
+ qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc);
+ boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi));
+ arq->submit(arh);
}
- } else {
- delete bc;
}
- delete (*i);
}
return e.end();
}
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h
index eba7c829a3..23f1c0ee13 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.h
+++ b/cpp/src/qpid/asyncStore/OperationQueue.h
@@ -35,15 +35,13 @@ namespace asyncStore {
class OperationQueue
{
public:
- OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller,
- qpid::broker::AsyncResultQueue* resultQueue = 0);
+ OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller);
virtual ~OperationQueue();
- void submit(const AsyncOperation* op);
+ void submit(boost::shared_ptr<const AsyncOperation> op);
protected:
- typedef qpid::sys::PollableQueue<const AsyncOperation*> OpQueue;
+ typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncOperation> > OpQueue;
OpQueue m_opQueue;
- qpid::broker::AsyncResultQueue* m_resultQueue;
OpQueue::Batch::const_iterator handle(const OpQueue::Batch& e);
};
diff --git a/cpp/src/qpid/asyncStore/Plugin.cpp b/cpp/src/qpid/asyncStore/Plugin.cpp
index 0441e9c082..4f35e8cd2a 100644
--- a/cpp/src/qpid/asyncStore/Plugin.cpp
+++ b/cpp/src/qpid/asyncStore/Plugin.cpp
@@ -41,7 +41,7 @@ Plugin::earlyInitialize(Target& target)
m_options.m_storeDir = dataDir.getPath ();
}
- m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options, 0)); // TODO: last arg: point to broker instance of AsyncResultQueue
+ m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options));
boost::shared_ptr<qpid::broker::AsyncStore> brokerAsyncStore(m_store);
broker->setAsyncStore(brokerAsyncStore);
boost::function<void()> fn = boost::bind(&Plugin::finalize, this);
diff --git a/cpp/src/qpid/broker/AsyncResultHandle.cpp b/cpp/src/qpid/broker/AsyncResultHandle.cpp
index 26e46fee1c..cdd2231977 100644
--- a/cpp/src/qpid/broker/AsyncResultHandle.cpp
+++ b/cpp/src/qpid/broker/AsyncResultHandle.cpp
@@ -65,10 +65,16 @@ AsyncResultHandle::getErrMsg() const
return impl->getErrMsg();
}
-const BrokerAsyncContext*
+boost::shared_ptr<BrokerAsyncContext>
AsyncResultHandle::getBrokerAsyncContext() const
{
return impl->getBrokerAsyncContext();
}
+void
+AsyncResultHandle::invokeAsyncResultCallback() const
+{
+ impl->getBrokerAsyncContext()->invokeCallback(this);
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/AsyncResultHandle.h b/cpp/src/qpid/broker/AsyncResultHandle.h
index 6f6290bfcb..f916bde5d3 100644
--- a/cpp/src/qpid/broker/AsyncResultHandle.h
+++ b/cpp/src/qpid/broker/AsyncResultHandle.h
@@ -43,11 +43,10 @@ public:
int getErrNo() const;
std::string getErrMsg() const;
- const BrokerAsyncContext* getBrokerAsyncContext() const;
+ boost::shared_ptr<BrokerAsyncContext> getBrokerAsyncContext() const;
+ void invokeAsyncResultCallback() const;
private:
- typedef qpid::broker::AsyncResultHandleImpl Impl;
- Impl* impl;
friend class qpid::messaging::PrivateImplRef<AsyncResultHandle>;
};
diff --git a/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp b/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp
index 36d45e7b0a..c8950d8ff1 100644
--- a/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp
+++ b/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp
@@ -28,17 +28,18 @@ namespace broker {
AsyncResultHandleImpl::AsyncResultHandleImpl() :
m_errNo(0),
- m_errMsg(),
- m_bc(0)
+ m_errMsg()
{}
-AsyncResultHandleImpl::AsyncResultHandleImpl(const BrokerAsyncContext* bc) :
+AsyncResultHandleImpl::AsyncResultHandleImpl(boost::shared_ptr<BrokerAsyncContext> bc) :
m_errNo(0),
m_errMsg(),
m_bc(bc)
{}
-AsyncResultHandleImpl::AsyncResultHandleImpl(const int errNo, const std::string& errMsg, const BrokerAsyncContext* bc) :
+AsyncResultHandleImpl::AsyncResultHandleImpl(const int errNo,
+ const std::string& errMsg,
+ boost::shared_ptr<BrokerAsyncContext> bc) :
m_errNo(errNo),
m_errMsg(errMsg),
m_bc(bc)
@@ -59,7 +60,7 @@ AsyncResultHandleImpl::getErrMsg() const
return m_errMsg;
}
-const BrokerAsyncContext*
+boost::shared_ptr<BrokerAsyncContext>
AsyncResultHandleImpl::getBrokerAsyncContext() const
{
return m_bc;
diff --git a/cpp/src/qpid/broker/AsyncResultHandleImpl.h b/cpp/src/qpid/broker/AsyncResultHandleImpl.h
index e1bd1fa0e9..4fe6d1248c 100644
--- a/cpp/src/qpid/broker/AsyncResultHandleImpl.h
+++ b/cpp/src/qpid/broker/AsyncResultHandleImpl.h
@@ -35,18 +35,20 @@ class AsyncResultHandleImpl : public virtual qpid::RefCounted
{
public:
AsyncResultHandleImpl();
- AsyncResultHandleImpl(const BrokerAsyncContext* bc);
- AsyncResultHandleImpl(const int errNo, const std::string& errMsg, const BrokerAsyncContext* bc);
+ AsyncResultHandleImpl(boost::shared_ptr<BrokerAsyncContext> bc);
+ AsyncResultHandleImpl(const int errNo,
+ const std::string& errMsg,
+ boost::shared_ptr<BrokerAsyncContext> bc);
virtual ~AsyncResultHandleImpl();
int getErrNo() const;
std::string getErrMsg() const;
- const BrokerAsyncContext* getBrokerAsyncContext() const;
+ boost::shared_ptr<BrokerAsyncContext> getBrokerAsyncContext() const;
private:
const int m_errNo;
const std::string m_errMsg;
- const BrokerAsyncContext* m_bc;
+ boost::shared_ptr<BrokerAsyncContext> m_bc;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/AsyncResultQueue.cpp b/cpp/src/qpid/broker/AsyncResultQueue.cpp
deleted file mode 100644
index 1094a582f4..0000000000
--- a/cpp/src/qpid/broker/AsyncResultQueue.cpp
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file AsyncResultQueue.cpp
- */
-
-#include "AsyncResultQueue.h"
-
-namespace qpid {
-namespace broker {
-
-AsyncResultQueue::AsyncResultQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) :
- m_resQueue(boost::bind(&AsyncResultQueue::handle, this, _1), poller)
-{
- m_resQueue.start();
-}
-
-AsyncResultQueue::~AsyncResultQueue()
-{
- m_resQueue.stop();
-}
-
-void
-AsyncResultQueue::submit(AsyncResultHandle* res)
-{
- m_resQueue.push(res);
-}
-
-//static
-/*
-void
-AsyncResultQueue::submit(AsyncResultQueue* arq, AsyncResultHandle* rh)
-{
- arq->submit(rh);
-}
-*/
-
-// protected
-AsyncResultQueue::ResultQueue::Batch::const_iterator
-AsyncResultQueue::handle(const ResultQueue::Batch& e)
-{
- return e.end();
-}
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
new file mode 100644
index 0000000000..8c99ce8ef2
--- /dev/null
+++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AsyncResultQueueImpl.cpp
+ */
+
+#include "AsyncResultHandle.h"
+#include "AsyncResultQueueImpl.h"
+
+namespace qpid {
+namespace broker {
+
+AsyncResultQueueImpl::AsyncResultQueueImpl(const boost::shared_ptr<qpid::sys::Poller>& poller) :
+ m_resQueue(boost::bind(&AsyncResultQueueImpl::handle, this, _1), poller)
+{
+ m_resQueue.start();
+}
+
+AsyncResultQueueImpl::~AsyncResultQueueImpl()
+{
+ m_resQueue.stop();
+}
+
+void
+AsyncResultQueueImpl::submit(boost::shared_ptr<AsyncResultHandle> arh)
+{
+//std::cout << "==> AsyncResultQueueImpl::submit() errNo=" << arh->getErrNo() << " errMsg=\"" << arh->getErrMsg() << "\"" << std::endl << std::flush;
+ m_resQueue.push(arh);
+}
+
+// protected
+AsyncResultQueueImpl::ResultQueue::Batch::const_iterator
+AsyncResultQueueImpl::handle(const ResultQueue::Batch& e)
+{
+
+ for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
+//std::cout << "<== AsyncResultQueueImpl::handle() errNo=" << (*i)->getErrNo() << " errMsg=\"" << (*i)->getErrMsg() << "\"" << std::endl << std::flush;
+ if ((*i)->isValid()) {
+ (*i)->invokeAsyncResultCallback();
+ }
+ }
+ return e.end();
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/AsyncResultQueue.h b/cpp/src/qpid/broker/AsyncResultQueueImpl.h
index 8881f25bac..fc93c2d806 100644
--- a/cpp/src/qpid/broker/AsyncResultQueue.h
+++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.h
@@ -18,11 +18,13 @@
*/
/**
- * \file AsyncResultQueue.h
+ * \file AsyncResultQueueImpl.h
*/
-#ifndef qpid_broker_AsyncResultQueue_h_
-#define qpid_broker_AsyncResultQueue_h_
+#ifndef qpid_broker_AsyncResultQueueImpl_h_
+#define qpid_broker_AsyncResultQueueImpl_h_
+
+#include "AsyncStore.h"
#include "qpid/sys/PollableQueue.h"
@@ -31,16 +33,15 @@ namespace broker {
class AsyncResultHandle;
-class AsyncResultQueue
+class AsyncResultQueueImpl : public AsyncResultQueue
{
public:
- AsyncResultQueue(const boost::shared_ptr<qpid::sys::Poller>& poller);
- virtual ~AsyncResultQueue();
- void submit(AsyncResultHandle* rh);
-// static void submit(AsyncResultQueue* arq, AsyncResultHandle* rh);
+ AsyncResultQueueImpl(const boost::shared_ptr<qpid::sys::Poller>& poller);
+ virtual ~AsyncResultQueueImpl();
+ virtual void submit(boost::shared_ptr<AsyncResultHandle> arh);
protected:
- typedef qpid::sys::PollableQueue<const AsyncResultHandle*> ResultQueue;
+ typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncResultHandle> > ResultQueue;
ResultQueue m_resQueue;
ResultQueue::Batch::const_iterator handle(const ResultQueue::Batch& e);
@@ -48,4 +49,4 @@ protected:
}} // namespace qpid::broker
-#endif // qpid_broker_AsyncResultQueue_h_
+#endif // qpid_broker_AsyncResultQueueImpl_h_
diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/AsyncStore.cpp
index d37b034648..10cb3d27cf 100644
--- a/cpp/src/qpid/broker/AsyncStore.cpp
+++ b/cpp/src/qpid/broker/AsyncStore.cpp
@@ -22,35 +22,16 @@
namespace qpid {
namespace broker {
-BrokerAsyncContext::~BrokerAsyncContext()
+AsyncResultQueue::~AsyncResultQueue()
{}
-DataSource::~DataSource()
+BrokerAsyncContext::~BrokerAsyncContext()
{}
-AsyncStore::AsyncStore()
+DataSource::~DataSource()
{}
AsyncStore::~AsyncStore()
{}
-/*
-AsyncResult::AsyncResult() :
- errNo(0),
- errMsg()
-{}
-
-AsyncResult::AsyncResult(const int errNo,
- const std::string& errMsg) :
- errNo(errNo),
- errMsg(errMsg)
-{}
-
-void
-AsyncResult::destroy()
-{
- delete this;
-}
-*/
-
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index c57bdaa552..7e2ee81620 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -24,48 +24,31 @@
// does not allow it. Using a local map<std::string, Variant> definition also precludes forward declaration.
#include "qpid/types/Variant.h" // qpid::types::Variant::Map
+#include <boost/shared_ptr.hpp>
#include <stdint.h>
#include <string>
namespace qpid {
namespace broker {
-// Defined by broker, implements qpid::messaging::Handle-type template to hide ref counting
-// Subclass this for specific contexts
-class BrokerAsyncContext {
-public:
- virtual ~BrokerAsyncContext();
-};
-
-// Callback definition:
-//struct AsyncResult {
-// int errNo; // 0 implies no error
-// std::string errMsg;
-// AsyncResult();
-// AsyncResult(const int errNo,
-// const std::string& errMsg);
-// void destroy();
-//};
-//typedef void (*ResultCallback)(const AsyncResult*, BrokerAsyncContext*);
-
+// This handle carries async op results
class AsyncResultHandle;
-class AsyncResultQueue; // Implements the result callback function
-// Singleton class in broker which contains return pollable queue. Use submitAsyncResult() to add reulsts to queue.
-class AsyncResultHandler {
+// Broker to subclass as a pollable queue
+class AsyncResultQueue {
public:
- virtual ~AsyncResultHandler();
-
- // Factory method to create result handle
-
- virtual AsyncResultHandle createAsyncResultHandle(const int errNo, const std::string& errMsg, BrokerAsyncContext*) = 0;
-
- // Async return interface
+ virtual ~AsyncResultQueue();
+ // TODO: Remove boost::shared_ptr<> from this interface
+ virtual void submit(boost::shared_ptr<AsyncResultHandle>) = 0;
+};
- virtual void submitAsyncResult(AsyncResultHandle&) = 0;
+// Subclass this for specific contexts
+class BrokerAsyncContext {
+public:
+ virtual ~BrokerAsyncContext();
+ virtual AsyncResultQueue* getAsyncResultQueue() const = 0;
+ virtual void invokeCallback(const AsyncResultHandle* const) const = 0;
};
-typedef void (qpid::broker::AsyncResultQueue::*ResultCallback)(AsyncResultHandle*);
-//typedef void (qpid::broker::AsyncResultQueue::*ResultCallback)(AsyncResultQueue*, AsyncResultHandle*);
class DataSource {
public:
@@ -74,6 +57,9 @@ public:
virtual void write(char* target) = 0;
};
+// Callback invoked by AsyncResultQueue to pass back async results
+typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
+
class ConfigHandle;
class EnqueueHandle;
class EventHandle;
@@ -85,39 +71,39 @@ class TxnHandle;
// Subclassed by store:
class AsyncStore {
public:
- AsyncStore();
virtual ~AsyncStore();
- // Factory methods for creating handles
+ // --- Factory methods for creating handles ---
virtual ConfigHandle createConfigHandle() = 0;
virtual EnqueueHandle createEnqueueHandle(MessageHandle&, QueueHandle&) = 0;
virtual EventHandle createEventHandle(QueueHandle&, const std::string& key=std::string()) = 0;
- virtual MessageHandle createMessageHandle(const DataSource*) = 0;
+ virtual MessageHandle createMessageHandle(const DataSource* const) = 0;
virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0;
- virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0;
+ virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0; // Distr. txns must supply xid
- // Store async interface
+ // --- Store async interface ---
- virtual void submitPrepare(TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; // Distributed txns only
- virtual void submitCommit(TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0;
- virtual void submitAbort(TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0;
+ // TODO: Remove boost::shared_ptr<> from this interface
+ virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only
+ virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitCreate(ConfigHandle&, const DataSource*, ResultCallback, BrokerAsyncContext*) = 0;
- virtual void submitDestroy(ConfigHandle&, ResultCallback, BrokerAsyncContext*) = 0;
+ virtual void submitCreate(ConfigHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitDestroy(ConfigHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitCreate(QueueHandle&, const DataSource*, ResultCallback, BrokerAsyncContext*) = 0;
- virtual void submitDestroy(QueueHandle&, ResultCallback, BrokerAsyncContext*) = 0;
- virtual void submitFlush(QueueHandle&, ResultCallback, BrokerAsyncContext*) = 0;
+ virtual void submitCreate(QueueHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitDestroy(QueueHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitFlush(QueueHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitCreate(EventHandle&, const DataSource*, ResultCallback, BrokerAsyncContext*) = 0;
- virtual void submitCreate(EventHandle&, const DataSource*, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0;
- virtual void submitDestroy(EventHandle&, ResultCallback, BrokerAsyncContext*) = 0;
- virtual void submitDestroy(EventHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0;
+ virtual void submitCreate(EventHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitCreate(EventHandle&, const DataSource* const, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitDestroy(EventHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitDestroy(EventHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
- virtual void submitEnqueue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0;
- virtual void submitDequeue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0;
+ virtual void submitEnqueue(EnqueueHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
+ virtual void submitDequeue(EnqueueHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0;
// Legacy - Restore FTD message, is NOT async!
virtual int loadContent(MessageHandle&, QueueHandle&, char* data, uint64_t offset, const uint64_t length) = 0;
diff --git a/cpp/src/qpid/broker/ConfigHandle.h b/cpp/src/qpid/broker/ConfigHandle.h
index e2cdca6f15..67009bf57a 100644
--- a/cpp/src/qpid/broker/ConfigHandle.h
+++ b/cpp/src/qpid/broker/ConfigHandle.h
@@ -44,8 +44,6 @@ public:
// <none>
private:
- typedef qpid::asyncStore::ConfigHandleImpl Impl;
- Impl* impl;
friend class qpid::messaging::PrivateImplRef<ConfigHandle>;
};
diff --git a/cpp/src/qpid/broker/EnqueueHandle.h b/cpp/src/qpid/broker/EnqueueHandle.h
index 3ab6885497..6053d1879c 100644
--- a/cpp/src/qpid/broker/EnqueueHandle.h
+++ b/cpp/src/qpid/broker/EnqueueHandle.h
@@ -44,8 +44,6 @@ public:
// <none>
private:
- typedef qpid::asyncStore::EnqueueHandleImpl Impl;
- Impl* impl;
friend class qpid::messaging::PrivateImplRef<EnqueueHandle>;
};
diff --git a/cpp/src/qpid/broker/EventHandle.h b/cpp/src/qpid/broker/EventHandle.h
index 355cb3a091..8ded98be4a 100644
--- a/cpp/src/qpid/broker/EventHandle.h
+++ b/cpp/src/qpid/broker/EventHandle.h
@@ -44,8 +44,6 @@ public:
const std::string& getKey() const;
private:
- typedef qpid::asyncStore::EventHandleImpl Impl;
- Impl* impl;
friend class qpid::messaging::PrivateImplRef<EventHandle>;
};
diff --git a/cpp/src/qpid/broker/MessageHandle.h b/cpp/src/qpid/broker/MessageHandle.h
index 9339d81f32..739c53f7d3 100644
--- a/cpp/src/qpid/broker/MessageHandle.h
+++ b/cpp/src/qpid/broker/MessageHandle.h
@@ -45,8 +45,6 @@ public:
// <none>
private:
- //typedef qpid::asyncStore::MessageHandleImpl Impl;
- //Impl* impl;
friend class qpid::messaging::PrivateImplRef<MessageHandle>;
};
diff --git a/cpp/src/qpid/broker/QueueHandle.h b/cpp/src/qpid/broker/QueueHandle.h
index a8caa03f97..cb366e2880 100644
--- a/cpp/src/qpid/broker/QueueHandle.h
+++ b/cpp/src/qpid/broker/QueueHandle.h
@@ -44,8 +44,6 @@ public:
const std::string& getName() const;
private:
- typedef qpid::asyncStore::QueueHandleImpl Impl;
- Impl* impl;
friend class qpid::messaging::PrivateImplRef<QueueHandle>;
};
diff --git a/cpp/src/qpid/broker/TxnHandle.h b/cpp/src/qpid/broker/TxnHandle.h
index 814b4ea0b3..5981b89026 100644
--- a/cpp/src/qpid/broker/TxnHandle.h
+++ b/cpp/src/qpid/broker/TxnHandle.h
@@ -45,8 +45,6 @@ public:
bool is2pc() const;
private:
- typedef qpid::asyncStore::TxnHandleImpl Impl;
- Impl* impl;
friend class qpid::messaging::PrivateImplRef<TxnHandle>;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 6042291a0a..89b9b5b9b5 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -46,6 +46,7 @@ MessageConsumer::~MessageConsumer()
void*
MessageConsumer::runConsumers()
{
+/*
uint32_t numMsgs = 0;
while (numMsgs < m_perfTestParams.m_numMsgs) {
if (m_queue->dispatch()) {
@@ -54,6 +55,7 @@ MessageConsumer::runConsumers()
::usleep(1000); // TODO - replace this poller with condition variable
}
}
+*/
return 0;
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
index 49d656aee4..1106069560 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
@@ -30,7 +30,7 @@
#include "QueuedMessage.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
-#include "qpid/broker/AsyncResultQueue.h"
+#include "qpid/broker/AsyncResultHandle.h"
namespace tests {
namespace storePerftools {
@@ -39,11 +39,11 @@ namespace asyncPerf {
MockPersistableQueue::MockPersistableQueue(const std::string& name,
const qpid::framing::FieldTable& /*args*/,
qpid::asyncStore::AsyncStoreImpl* store,
- qpid::broker::AsyncResultQueue& resultQueue) :
+ qpid::broker::AsyncResultQueue& arq) :
qpid::broker::PersistableQueue(),
m_name(name),
m_store(store),
- m_resultQueue(resultQueue),
+ m_resultQueue(arq),
m_asyncOpCounter(0UL),
m_persistenceId(0ULL),
m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this.
@@ -67,18 +67,17 @@ MockPersistableQueue::~MockPersistableQueue()
}
// static
-/*
void
-MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
- qpid::broker::BrokerAsyncContext* bc)
+MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh)
{
- if (bc && res) {
- QueueAsyncContext* qc = dynamic_cast<QueueAsyncContext*>(bc);
- if (res->errNo) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
// TODO: Handle async failure here (other than by simply printing a message)
std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
- << res->errNo << " (" << res->errMsg << ")" << std::endl;
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
} else {
+//std::cout << "QQQ MockPersistableQueue::handleAsyncResult() op=" << qc->getOpStr() << std::endl << std::flush;
// Handle async success here
switch(qc->getOpCode()) {
case qpid::asyncStore::AsyncOperation::QUEUE_CREATE:
@@ -103,10 +102,7 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
};
}
}
- if (bc) delete bc;
- if (res) delete res;
}
-*/
const qpid::broker::QueueHandle&
MockPersistableQueue::getHandle() const
@@ -129,14 +125,14 @@ MockPersistableQueue::getStore()
void
MockPersistableQueue::asyncCreate()
{
- qpid::broker::ResultCallback rcb = &qpid::broker::AsyncResultQueue::submit;
if (m_store) {
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ qpid::asyncStore::AsyncOperation::QUEUE_CREATE,
+ &handleAsyncResult,
+ &m_resultQueue));
m_store->submitCreate(m_queueHandle,
this,
- rcb,
-// &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/,
- new QueueAsyncContext(shared_from_this(),
- qpid::asyncStore::AsyncOperation::QUEUE_CREATE));
+ qac);
++m_asyncOpCounter;
}
}
@@ -147,10 +143,12 @@ MockPersistableQueue::asyncDestroy(const bool deleteQueue)
m_destroyPending = true;
if (m_store) {
if (deleteQueue) {
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ qpid::asyncStore::AsyncOperation::QUEUE_DESTROY,
+ &handleAsyncResult,
+ &m_resultQueue));
m_store->submitDestroy(m_queueHandle,
- &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/,
- new QueueAsyncContext(shared_from_this(),
- qpid::asyncStore::AsyncOperation::QUEUE_DESTROY));
+ qac);
++m_asyncOpCounter;
}
m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000));
@@ -162,6 +160,7 @@ MockPersistableQueue::deliver(boost::shared_ptr<MockPersistableMessage> msg)
{
QueuedMessage qm(this, msg);
if(enqueue((MockTransactionContext*)0, qm)) {
+ // TODO: Do we need to wait for the enqueue to complete before push()ing the msg?
push(qm);
}
}
@@ -334,12 +333,14 @@ MockPersistableQueue::asyncEnqueue(MockTransactionContext* txn,
{
qm.payload()->setPersistenceId(m_store->getNextRid());
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
- m_store->submitEnqueue(/*enqHandle*/qm.enqHandle(),
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ qm.payload(),
+ qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
+ &handleAsyncResult,
+ &m_resultQueue));
+ m_store->submitEnqueue(qm.enqHandle(),
txn->getHandle(),
- &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/,
- new QueueAsyncContext(shared_from_this(),
- qm.payload(),
- qpid::asyncStore::AsyncOperation::MSG_ENQUEUE));
+ qac);
++m_asyncOpCounter;
return true;
}
@@ -350,13 +351,14 @@ MockPersistableQueue::asyncDequeue(MockTransactionContext* txn,
QueuedMessage& qm)
{
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
- qpid::broker::EnqueueHandle enqHandle = qm.enqHandle();
- m_store->submitDequeue(enqHandle,
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ qm.payload(),
+ qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
+ &handleAsyncResult,
+ &m_resultQueue));
+ m_store->submitDequeue(qm.enqHandle(),
txn->getHandle(),
- &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/,
- new QueueAsyncContext(shared_from_this(),
- qm.payload(),
- qpid::asyncStore::AsyncOperation::MSG_DEQUEUE));
+ qac);
++m_asyncOpCounter;
return true;
}
@@ -374,7 +376,7 @@ MockPersistableQueue::destroyCheck(const std::string& opDescr) const
// protected
void
-MockPersistableQueue::createComplete(const QueueAsyncContext* qc)
+MockPersistableQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc)
{
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": createComplete()" << std::endl << std::flush;
assert(qc->getQueue().get() == this);
@@ -383,7 +385,7 @@ MockPersistableQueue::createComplete(const QueueAsyncContext* qc)
// protected
void
-MockPersistableQueue::flushComplete(const QueueAsyncContext* qc)
+MockPersistableQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc)
{
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": flushComplete()" << std::endl << std::flush;
assert(qc->getQueue().get() == this);
@@ -392,7 +394,7 @@ MockPersistableQueue::flushComplete(const QueueAsyncContext* qc)
// protected
void
-MockPersistableQueue::destroyComplete(const QueueAsyncContext* qc)
+MockPersistableQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc)
{
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": destroyComplete()" << std::endl << std::flush;
assert(qc->getQueue().get() == this);
@@ -401,7 +403,7 @@ MockPersistableQueue::destroyComplete(const QueueAsyncContext* qc)
}
void
-MockPersistableQueue::enqueueComplete(const QueueAsyncContext* qc)
+MockPersistableQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
{
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush;
assert(qc->getQueue().get() == this);
@@ -409,7 +411,7 @@ MockPersistableQueue::enqueueComplete(const QueueAsyncContext* qc)
}
void
-MockPersistableQueue::dequeueComplete(const QueueAsyncContext* qc)
+MockPersistableQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
{
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush;
assert(qc->getQueue().get() == this);
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h
index e62aeec420..bb68015b95 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h
@@ -64,11 +64,10 @@ public:
MockPersistableQueue(const std::string& name,
const qpid::framing::FieldTable& args,
qpid::asyncStore::AsyncStoreImpl* store,
- qpid::broker::AsyncResultQueue& rq);
+ qpid::broker::AsyncResultQueue& arq);
virtual ~MockPersistableQueue();
-// static void handleAsyncResult(const qpid::broker::AsyncResult* res,
-// qpid::broker::BrokerAsyncContext* bc);
+ static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res);
const qpid::broker::QueueHandle& getHandle() const;
qpid::broker::QueueHandle& getHandle();
qpid::asyncStore::AsyncStoreImpl* getStore();
@@ -143,11 +142,11 @@ protected:
void destroyCheck(const std::string& opDescr) const;
// --- Async op completions (called through handleAsyncResult) ---
- void createComplete(const QueueAsyncContext* qc);
- void flushComplete(const QueueAsyncContext* qc);
- void destroyComplete(const QueueAsyncContext* qc);
- void enqueueComplete(const QueueAsyncContext* qc);
- void dequeueComplete(const QueueAsyncContext* qc);
+ void createComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+ void flushComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+ void destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+ void enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+ void dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc);
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
index 66e0bb3dbf..09518c7c61 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
@@ -32,7 +32,6 @@
#include "tests/storePerftools/common/Thread.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
-#include "qpid/broker/AsyncResultQueue.h"
#include "qpid/sys/Poller.h"
#include <iomanip>
@@ -70,7 +69,7 @@ PerfTest::~PerfTest()
void
PerfTest::prepareStore()
{
- m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts, &m_resultQueue);
+ m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts);
m_store->initialize();
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
index 46455e4af0..651d97f9fc 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
@@ -28,7 +28,7 @@
#include "tests/storePerftools/common/Streamable.h"
-#include "qpid/broker/AsyncResultQueue.h"
+#include "qpid/broker/AsyncResultQueueImpl.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Thread.h"
@@ -70,7 +70,7 @@ protected:
const char* m_msgData;
boost::shared_ptr<qpid::sys::Poller> m_poller;
qpid::sys::Thread m_pollingThread;
- qpid::broker::AsyncResultQueue m_resultQueue;
+ qpid::broker::AsyncResultQueueImpl m_resultQueue;
qpid::asyncStore::AsyncStoreImpl* m_store;
std::deque<boost::shared_ptr<MockPersistableQueue> > m_queueList;
std::deque<boost::shared_ptr<MessageProducer> > m_producers;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
index be0c087390..513175ab41 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
@@ -30,19 +30,27 @@ namespace storePerftools {
namespace asyncPerf {
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q,
- const qpid::asyncStore::AsyncOperation::opCode op) :
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ qpid::broker::AsyncResultCallback rcb,
+ qpid::broker::AsyncResultQueue* const arq) :
m_q(q),
- m_op(op)
+ m_op(op),
+ m_rcb(rcb),
+ m_arq(arq)
{
assert(m_q.get() != 0);
}
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q,
boost::shared_ptr<MockPersistableMessage> msg,
- const qpid::asyncStore::AsyncOperation::opCode op) :
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ qpid::broker::AsyncResultCallback rcb,
+ qpid::broker::AsyncResultQueue* const arq) :
m_q(q),
m_msg(msg),
- m_op(op)
+ m_op(op),
+ m_rcb(rcb),
+ m_arq(arq)
{
assert(m_q.get() != 0);
assert(m_msg.get() != 0);
@@ -75,6 +83,24 @@ QueueAsyncContext::getMessage() const
return m_msg;
}
+qpid::broker::AsyncResultQueue*
+QueueAsyncContext::getAsyncResultQueue() const
+{
+ return m_arq;
+}
+
+qpid::broker::AsyncResultCallback
+QueueAsyncContext::getAsyncResultCallback() const
+{
+ return m_rcb;
+}
+
+void
+QueueAsyncContext::invokeCallback(const qpid::broker::AsyncResultHandle* const arh) const
+{
+ m_rcb(arh);
+}
+
void
QueueAsyncContext::destroy()
{
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
index 2b6b3778cd..ab68b6b39f 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
@@ -29,6 +29,8 @@
#include <boost/shared_ptr.hpp>
+
+
namespace tests {
namespace storePerftools {
namespace asyncPerf {
@@ -40,21 +42,30 @@ class QueueAsyncContext: public qpid::broker::BrokerAsyncContext
{
public:
QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q,
- const qpid::asyncStore::AsyncOperation::opCode op);
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ qpid::broker::AsyncResultCallback rcb,
+ qpid::broker::AsyncResultQueue* const arq);
QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q,
boost::shared_ptr<MockPersistableMessage> msg,
- const qpid::asyncStore::AsyncOperation::opCode op);
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ qpid::broker::AsyncResultCallback rcb,
+ qpid::broker::AsyncResultQueue* const arq);
virtual ~QueueAsyncContext();
qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
const char* getOpStr() const;
boost::shared_ptr<MockPersistableQueue> getQueue() const;
boost::shared_ptr<MockPersistableMessage> getMessage() const;
+ qpid::broker::AsyncResultQueue* getAsyncResultQueue() const;
+ qpid::broker::AsyncResultCallback getAsyncResultCallback() const;
+ void invokeCallback(const qpid::broker::AsyncResultHandle* const arh) const;
void destroy();
protected:
boost::shared_ptr<MockPersistableQueue> m_q;
boost::shared_ptr<MockPersistableMessage> m_msg;
const qpid::asyncStore::AsyncOperation::opCode m_op;
+ qpid::broker::AsyncResultCallback m_rcb;
+ qpid::broker::AsyncResultQueue* const m_arq;
};
}}} // namespace tests::storePerftools::asyncPerf