diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-06-08 14:59:04 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-06-08 14:59:04 +0000 |
commit | 56094d9861141097e107a71dac4c808e0aca9c5f (patch) | |
tree | 4a62659c9d6c7dc47549b486483c1afa45183a9a | |
parent | 22d453646b4815752134ad62e0b27841a103afb2 (diff) | |
download | qpid-python-56094d9861141097e107a71dac4c808e0aca9c5f.tar.gz |
QPID-3858: WIP - completed async return path
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1348098 13f79535-47bb-0310-9956-ffa450edef68
30 files changed, 375 insertions, 436 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index b5ac6af825..575c78e320 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -1078,7 +1078,7 @@ set (qpidbroker_SOURCES qpid/amqp_0_10/Connection.cpp qpid/broker/AsyncResultHandle.cpp qpid/broker/AsyncResultHandleImpl.cpp - qpid/broker/AsyncResultQueue.cpp + qpid/broker/AsyncResultQueueImpl.cpp qpid/broker/AsyncStore.cpp qpid/broker/Broker.cpp qpid/broker/Credit.cpp diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.cpp b/cpp/src/qpid/asyncStore/AsyncOperation.cpp index 19b15b12d0..0d8ff1535e 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.cpp +++ b/cpp/src/qpid/asyncStore/AsyncOperation.cpp @@ -34,60 +34,50 @@ AsyncOperation::AsyncOperation() : m_op(NONE), m_targetHandle(), m_dataSrc(0), - m_txnHandle(0), - m_resCb(0), - m_brokerCtxt(0) + m_txnHandle(0) {} AsyncOperation::AsyncOperation(const opCode op, const qpid::broker::IdHandle* th, - const qpid::broker::ResultCallback resCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) : + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : m_op(op), m_targetHandle(th), m_dataSrc(0), m_txnHandle(0), - m_resCb(resCb), m_brokerCtxt(brokerCtxt) {} AsyncOperation::AsyncOperation(const opCode op, const qpid::broker::IdHandle* th, - const qpid::broker::DataSource* dataSrc, - const qpid::broker::ResultCallback resCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) : + const qpid::broker::DataSource* const dataSrc, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : m_op(op), m_targetHandle(th), m_dataSrc(dataSrc), m_txnHandle(0), - m_resCb(resCb), m_brokerCtxt(brokerCtxt) {} AsyncOperation::AsyncOperation(const opCode op, const qpid::broker::IdHandle* th, const qpid::broker::TxnHandle* txnHandle, - const qpid::broker::ResultCallback resCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) : + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : m_op(op), m_targetHandle(th), m_dataSrc(0), m_txnHandle(txnHandle), - m_resCb(resCb), m_brokerCtxt(brokerCtxt) {} AsyncOperation::AsyncOperation(const opCode op, const qpid::broker::IdHandle* th, - const qpid::broker::DataSource* dataSrc, + const qpid::broker::DataSource* const dataSrc, const qpid::broker::TxnHandle* txnHandle, - const qpid::broker::ResultCallback resCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) : + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) : m_op(op), m_targetHandle(th), m_dataSrc(dataSrc), m_txnHandle(txnHandle), - m_resCb(resCb), m_brokerCtxt(brokerCtxt) {} diff --git a/cpp/src/qpid/asyncStore/AsyncOperation.h b/cpp/src/qpid/asyncStore/AsyncOperation.h index 998d03955e..24f8baeb4d 100644 --- a/cpp/src/qpid/asyncStore/AsyncOperation.h +++ b/cpp/src/qpid/asyncStore/AsyncOperation.h @@ -50,34 +50,29 @@ public: AsyncOperation(); AsyncOperation(const opCode op, const qpid::broker::IdHandle* th, - const qpid::broker::ResultCallback resCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); AsyncOperation(const opCode op, const qpid::broker::IdHandle* th, - const qpid::broker::DataSource* dataSrc, - const qpid::broker::ResultCallback resCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + const qpid::broker::DataSource* const dataSrc, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); AsyncOperation(const opCode op, const qpid::broker::IdHandle* th, const qpid::broker::TxnHandle* txnHandle, - const qpid::broker::ResultCallback resCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); AsyncOperation(const opCode op, const qpid::broker::IdHandle* th, - const qpid::broker::DataSource* dataSrc, + const qpid::broker::DataSource* const dataSrc, const qpid::broker::TxnHandle* txnHandle, - const qpid::broker::ResultCallback resCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); virtual ~AsyncOperation(); const char* getOpStr() const; static const char* getOpStr(const opCode op); opCode m_op; const qpid::broker::IdHandle* m_targetHandle; - const qpid::broker::DataSource* m_dataSrc; + const qpid::broker::DataSource* const m_dataSrc; const qpid::broker::TxnHandle* m_txnHandle; - const qpid::broker::ResultCallback m_resCb; - qpid::broker::BrokerAsyncContext* m_brokerCtxt; + boost::shared_ptr<qpid::broker::BrokerAsyncContext> const m_brokerCtxt; }; }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 6283d07ee9..a9fc13363a 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -38,12 +38,11 @@ namespace qpid { namespace asyncStore { AsyncStoreImpl::AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, - const AsyncStoreOptions& opts, - qpid::broker::AsyncResultQueue* resultQueue) : + const AsyncStoreOptions& opts) : m_poller(poller), m_opts(opts), m_runState(), - m_operations(m_poller, resultQueue) + m_operations(m_poller) {} AsyncStoreImpl::~AsyncStoreImpl() @@ -63,23 +62,17 @@ void AsyncStoreImpl::initManagement(qpid::broker::Broker* /*broker*/) {} -qpid::broker::TxnHandle -AsyncStoreImpl::createTxnHandle(const std::string& xid) -{ - return qpid::broker::TxnHandle(new TxnHandleImpl(xid)); -} - qpid::broker::ConfigHandle AsyncStoreImpl::createConfigHandle() { return qpid::broker::ConfigHandle(new ConfigHandleImpl()); } -qpid::broker::QueueHandle -AsyncStoreImpl::createQueueHandle(const std::string& name, - const qpid::types::Variant::Map& opts) +qpid::broker::EnqueueHandle +AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, + qpid::broker::QueueHandle& queueHandle) { - return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts)); + return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle)); } qpid::broker::EventHandle @@ -90,202 +83,178 @@ AsyncStoreImpl::createEventHandle(qpid::broker::QueueHandle& queueHandle, } qpid::broker::MessageHandle -AsyncStoreImpl::createMessageHandle(const qpid::broker::DataSource* dataSrc) +AsyncStoreImpl::createMessageHandle(const qpid::broker::DataSource* const dataSrc) { return qpid::broker::MessageHandle(new MessageHandleImpl(dataSrc)); } -qpid::broker::EnqueueHandle -AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, - qpid::broker::QueueHandle& queueHandle) +qpid::broker::QueueHandle +AsyncStoreImpl::createQueueHandle(const std::string& name, + const qpid::types::Variant::Map& opts) { - return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle)); + return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts)); +} + +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle(const std::string& xid) +{ + return qpid::broker::TxnHandle(new TxnHandleImpl(xid)); } void AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_PREPARE, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE, + dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_COMMIT, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT, + dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_ABORT, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT, + dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, - const qpid::broker::DataSource* dataSrc, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + const qpid::broker::DataSource* const dataSrc, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::CONFIG_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), - dataSrc, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE, + dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), + dataSrc, + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::CONFIG_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY, + dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, - const qpid::broker::DataSource* dataSrc, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + const qpid::broker::DataSource* const dataSrc, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), - dataSrc, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE, + dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), + dataSrc, + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY, + dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_FLUSH, + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH, dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), - resultCb, - brokerCtxt); + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, - const qpid::broker::DataSource* dataSrc, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + const qpid::broker::DataSource* const dataSrc, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - dataSrc, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE, + dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + dataSrc, + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, - const qpid::broker::DataSource* dataSrc, + const qpid::broker::DataSource* const dataSrc, qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - dataSrc, - &txnHandle, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE, + dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + dataSrc, + &txnHandle, + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY, + dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - &txnHandle, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY, + dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + &txnHandle, + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), - &txnHandle, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE, + dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + &txnHandle, + brokerCtxt)); m_operations.submit(op); -//delete op; -//delete brokerCtxt; } void AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), - &txnHandle, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE, + dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + &txnHandle, + brokerCtxt)); m_operations.submit(op); } diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index 717723eda3..60365e0e8a 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -43,82 +43,71 @@ namespace asyncStore { class AsyncStoreImpl: public qpid::broker::AsyncStore { public: AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, - const AsyncStoreOptions& opts, - qpid::broker::AsyncResultQueue* resultQueue); + const AsyncStoreOptions& opts); virtual ~AsyncStoreImpl(); void initialize(); - uint64_t getNextRid(); + uint64_t getNextRid(); // Global counter for journal RIDs - // Management + // --- Management --- void initManagement(qpid::broker::Broker* broker); - // AsyncStore interface + // --- Factory methods for creating handles --- + - qpid::broker::TxnHandle createTxnHandle(const std::string& xid=std::string()); qpid::broker::ConfigHandle createConfigHandle(); - qpid::broker::QueueHandle createQueueHandle(const std::string& name, - const qpid::types::Variant::Map& opts); - qpid::broker::EventHandle createEventHandle(qpid::broker::QueueHandle& queueHandle, - const std::string& key=std::string()); - qpid::broker::MessageHandle createMessageHandle(const qpid::broker::DataSource* dataSrc); qpid::broker::EnqueueHandle createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, qpid::broker::QueueHandle& queueHandle); + qpid::broker::EventHandle createEventHandle(qpid::broker::QueueHandle& queueHandle, + const std::string& key=std::string()); + qpid::broker::MessageHandle createMessageHandle(const qpid::broker::DataSource* const dataSrc); + qpid::broker::QueueHandle createQueueHandle(const std::string& name, + const qpid::types::Variant::Map& opts); + qpid::broker::TxnHandle createTxnHandle(const std::string& xid=std::string()); + + + // --- Store async interface --- void submitPrepare(qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitCommit(qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitAbort(qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitCreate(qpid::broker::ConfigHandle& cfgHandle, - const qpid::broker::DataSource* dataSrc, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + const qpid::broker::DataSource* const dataSrc, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitDestroy(qpid::broker::ConfigHandle& cfgHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitCreate(qpid::broker::QueueHandle& queueHandle, - const qpid::broker::DataSource* dataSrc, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + const qpid::broker::DataSource* const dataSrc, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitDestroy(qpid::broker::QueueHandle& queueHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitFlush(qpid::broker::QueueHandle& queueHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitCreate(qpid::broker::EventHandle& eventHandle, - const qpid::broker::DataSource* dataSrc, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + const qpid::broker::DataSource* const dataSrc, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitCreate(qpid::broker::EventHandle& eventHandle, - const qpid::broker::DataSource* dataSrc, + const qpid::broker::DataSource* const dataSrc, qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitDestroy(qpid::broker::EventHandle& eventHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); void submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt); // Legacy - Restore FTD message, is NOT async! virtual int loadContent(qpid::broker::MessageHandle& msgHandle, diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index f13114f41e..a455e445ab 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -28,10 +28,8 @@ namespace qpid { namespace asyncStore { -OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller, - qpid::broker::AsyncResultQueue* resultQueue) : - m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller), - m_resultQueue(resultQueue) +OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) : + m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller) { m_opQueue.start(); } @@ -42,9 +40,9 @@ OperationQueue::~OperationQueue() } void -OperationQueue::submit(const AsyncOperation* op) +OperationQueue::submit(boost::shared_ptr<const AsyncOperation> op) { -std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; +//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; m_opQueue.push(op); } @@ -53,19 +51,16 @@ OperationQueue::OpQueue::Batch::const_iterator OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { -std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; - qpid::broker::BrokerAsyncContext* bc = (*i)->m_brokerCtxt; - qpid::broker::ResultCallback rcb = (*i)->m_resCb; - if (rcb) { -// ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt); -// rcb(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc))); - if (m_resultQueue) { - (m_resultQueue->*rcb)(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc))); +//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; + boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->m_brokerCtxt; + if (bc) { + qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue(); + if (arq) { + qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc); + boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi)); + arq->submit(arh); } - } else { - delete bc; } - delete (*i); } return e.end(); } diff --git a/cpp/src/qpid/asyncStore/OperationQueue.h b/cpp/src/qpid/asyncStore/OperationQueue.h index eba7c829a3..23f1c0ee13 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.h +++ b/cpp/src/qpid/asyncStore/OperationQueue.h @@ -35,15 +35,13 @@ namespace asyncStore { class OperationQueue { public: - OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller, - qpid::broker::AsyncResultQueue* resultQueue = 0); + OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller); virtual ~OperationQueue(); - void submit(const AsyncOperation* op); + void submit(boost::shared_ptr<const AsyncOperation> op); protected: - typedef qpid::sys::PollableQueue<const AsyncOperation*> OpQueue; + typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncOperation> > OpQueue; OpQueue m_opQueue; - qpid::broker::AsyncResultQueue* m_resultQueue; OpQueue::Batch::const_iterator handle(const OpQueue::Batch& e); }; diff --git a/cpp/src/qpid/asyncStore/Plugin.cpp b/cpp/src/qpid/asyncStore/Plugin.cpp index 0441e9c082..4f35e8cd2a 100644 --- a/cpp/src/qpid/asyncStore/Plugin.cpp +++ b/cpp/src/qpid/asyncStore/Plugin.cpp @@ -41,7 +41,7 @@ Plugin::earlyInitialize(Target& target) m_options.m_storeDir = dataDir.getPath (); } - m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options, 0)); // TODO: last arg: point to broker instance of AsyncResultQueue + m_store.reset(new qpid::asyncStore::AsyncStoreImpl(broker->getPoller(), m_options)); boost::shared_ptr<qpid::broker::AsyncStore> brokerAsyncStore(m_store); broker->setAsyncStore(brokerAsyncStore); boost::function<void()> fn = boost::bind(&Plugin::finalize, this); diff --git a/cpp/src/qpid/broker/AsyncResultHandle.cpp b/cpp/src/qpid/broker/AsyncResultHandle.cpp index 26e46fee1c..cdd2231977 100644 --- a/cpp/src/qpid/broker/AsyncResultHandle.cpp +++ b/cpp/src/qpid/broker/AsyncResultHandle.cpp @@ -65,10 +65,16 @@ AsyncResultHandle::getErrMsg() const return impl->getErrMsg(); } -const BrokerAsyncContext* +boost::shared_ptr<BrokerAsyncContext> AsyncResultHandle::getBrokerAsyncContext() const { return impl->getBrokerAsyncContext(); } +void +AsyncResultHandle::invokeAsyncResultCallback() const +{ + impl->getBrokerAsyncContext()->invokeCallback(this); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncResultHandle.h b/cpp/src/qpid/broker/AsyncResultHandle.h index 6f6290bfcb..f916bde5d3 100644 --- a/cpp/src/qpid/broker/AsyncResultHandle.h +++ b/cpp/src/qpid/broker/AsyncResultHandle.h @@ -43,11 +43,10 @@ public: int getErrNo() const; std::string getErrMsg() const; - const BrokerAsyncContext* getBrokerAsyncContext() const; + boost::shared_ptr<BrokerAsyncContext> getBrokerAsyncContext() const; + void invokeAsyncResultCallback() const; private: - typedef qpid::broker::AsyncResultHandleImpl Impl; - Impl* impl; friend class qpid::messaging::PrivateImplRef<AsyncResultHandle>; }; diff --git a/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp b/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp index 36d45e7b0a..c8950d8ff1 100644 --- a/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp +++ b/cpp/src/qpid/broker/AsyncResultHandleImpl.cpp @@ -28,17 +28,18 @@ namespace broker { AsyncResultHandleImpl::AsyncResultHandleImpl() : m_errNo(0), - m_errMsg(), - m_bc(0) + m_errMsg() {} -AsyncResultHandleImpl::AsyncResultHandleImpl(const BrokerAsyncContext* bc) : +AsyncResultHandleImpl::AsyncResultHandleImpl(boost::shared_ptr<BrokerAsyncContext> bc) : m_errNo(0), m_errMsg(), m_bc(bc) {} -AsyncResultHandleImpl::AsyncResultHandleImpl(const int errNo, const std::string& errMsg, const BrokerAsyncContext* bc) : +AsyncResultHandleImpl::AsyncResultHandleImpl(const int errNo, + const std::string& errMsg, + boost::shared_ptr<BrokerAsyncContext> bc) : m_errNo(errNo), m_errMsg(errMsg), m_bc(bc) @@ -59,7 +60,7 @@ AsyncResultHandleImpl::getErrMsg() const return m_errMsg; } -const BrokerAsyncContext* +boost::shared_ptr<BrokerAsyncContext> AsyncResultHandleImpl::getBrokerAsyncContext() const { return m_bc; diff --git a/cpp/src/qpid/broker/AsyncResultHandleImpl.h b/cpp/src/qpid/broker/AsyncResultHandleImpl.h index e1bd1fa0e9..4fe6d1248c 100644 --- a/cpp/src/qpid/broker/AsyncResultHandleImpl.h +++ b/cpp/src/qpid/broker/AsyncResultHandleImpl.h @@ -35,18 +35,20 @@ class AsyncResultHandleImpl : public virtual qpid::RefCounted { public: AsyncResultHandleImpl(); - AsyncResultHandleImpl(const BrokerAsyncContext* bc); - AsyncResultHandleImpl(const int errNo, const std::string& errMsg, const BrokerAsyncContext* bc); + AsyncResultHandleImpl(boost::shared_ptr<BrokerAsyncContext> bc); + AsyncResultHandleImpl(const int errNo, + const std::string& errMsg, + boost::shared_ptr<BrokerAsyncContext> bc); virtual ~AsyncResultHandleImpl(); int getErrNo() const; std::string getErrMsg() const; - const BrokerAsyncContext* getBrokerAsyncContext() const; + boost::shared_ptr<BrokerAsyncContext> getBrokerAsyncContext() const; private: const int m_errNo; const std::string m_errMsg; - const BrokerAsyncContext* m_bc; + boost::shared_ptr<BrokerAsyncContext> m_bc; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncResultQueue.cpp b/cpp/src/qpid/broker/AsyncResultQueue.cpp deleted file mode 100644 index 1094a582f4..0000000000 --- a/cpp/src/qpid/broker/AsyncResultQueue.cpp +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file AsyncResultQueue.cpp - */ - -#include "AsyncResultQueue.h" - -namespace qpid { -namespace broker { - -AsyncResultQueue::AsyncResultQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) : - m_resQueue(boost::bind(&AsyncResultQueue::handle, this, _1), poller) -{ - m_resQueue.start(); -} - -AsyncResultQueue::~AsyncResultQueue() -{ - m_resQueue.stop(); -} - -void -AsyncResultQueue::submit(AsyncResultHandle* res) -{ - m_resQueue.push(res); -} - -//static -/* -void -AsyncResultQueue::submit(AsyncResultQueue* arq, AsyncResultHandle* rh) -{ - arq->submit(rh); -} -*/ - -// protected -AsyncResultQueue::ResultQueue::Batch::const_iterator -AsyncResultQueue::handle(const ResultQueue::Batch& e) -{ - return e.end(); -} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp new file mode 100644 index 0000000000..8c99ce8ef2 --- /dev/null +++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.cpp @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AsyncResultQueueImpl.cpp + */ + +#include "AsyncResultHandle.h" +#include "AsyncResultQueueImpl.h" + +namespace qpid { +namespace broker { + +AsyncResultQueueImpl::AsyncResultQueueImpl(const boost::shared_ptr<qpid::sys::Poller>& poller) : + m_resQueue(boost::bind(&AsyncResultQueueImpl::handle, this, _1), poller) +{ + m_resQueue.start(); +} + +AsyncResultQueueImpl::~AsyncResultQueueImpl() +{ + m_resQueue.stop(); +} + +void +AsyncResultQueueImpl::submit(boost::shared_ptr<AsyncResultHandle> arh) +{ +//std::cout << "==> AsyncResultQueueImpl::submit() errNo=" << arh->getErrNo() << " errMsg=\"" << arh->getErrMsg() << "\"" << std::endl << std::flush; + m_resQueue.push(arh); +} + +// protected +AsyncResultQueueImpl::ResultQueue::Batch::const_iterator +AsyncResultQueueImpl::handle(const ResultQueue::Batch& e) +{ + + for (ResultQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { +//std::cout << "<== AsyncResultQueueImpl::handle() errNo=" << (*i)->getErrNo() << " errMsg=\"" << (*i)->getErrMsg() << "\"" << std::endl << std::flush; + if ((*i)->isValid()) { + (*i)->invokeAsyncResultCallback(); + } + } + return e.end(); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncResultQueue.h b/cpp/src/qpid/broker/AsyncResultQueueImpl.h index 8881f25bac..fc93c2d806 100644 --- a/cpp/src/qpid/broker/AsyncResultQueue.h +++ b/cpp/src/qpid/broker/AsyncResultQueueImpl.h @@ -18,11 +18,13 @@ */ /** - * \file AsyncResultQueue.h + * \file AsyncResultQueueImpl.h */ -#ifndef qpid_broker_AsyncResultQueue_h_ -#define qpid_broker_AsyncResultQueue_h_ +#ifndef qpid_broker_AsyncResultQueueImpl_h_ +#define qpid_broker_AsyncResultQueueImpl_h_ + +#include "AsyncStore.h" #include "qpid/sys/PollableQueue.h" @@ -31,16 +33,15 @@ namespace broker { class AsyncResultHandle; -class AsyncResultQueue +class AsyncResultQueueImpl : public AsyncResultQueue { public: - AsyncResultQueue(const boost::shared_ptr<qpid::sys::Poller>& poller); - virtual ~AsyncResultQueue(); - void submit(AsyncResultHandle* rh); -// static void submit(AsyncResultQueue* arq, AsyncResultHandle* rh); + AsyncResultQueueImpl(const boost::shared_ptr<qpid::sys::Poller>& poller); + virtual ~AsyncResultQueueImpl(); + virtual void submit(boost::shared_ptr<AsyncResultHandle> arh); protected: - typedef qpid::sys::PollableQueue<const AsyncResultHandle*> ResultQueue; + typedef qpid::sys::PollableQueue<boost::shared_ptr<const AsyncResultHandle> > ResultQueue; ResultQueue m_resQueue; ResultQueue::Batch::const_iterator handle(const ResultQueue::Batch& e); @@ -48,4 +49,4 @@ protected: }} // namespace qpid::broker -#endif // qpid_broker_AsyncResultQueue_h_ +#endif // qpid_broker_AsyncResultQueueImpl_h_ diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/AsyncStore.cpp index d37b034648..10cb3d27cf 100644 --- a/cpp/src/qpid/broker/AsyncStore.cpp +++ b/cpp/src/qpid/broker/AsyncStore.cpp @@ -22,35 +22,16 @@ namespace qpid { namespace broker { -BrokerAsyncContext::~BrokerAsyncContext() +AsyncResultQueue::~AsyncResultQueue() {} -DataSource::~DataSource() +BrokerAsyncContext::~BrokerAsyncContext() {} -AsyncStore::AsyncStore() +DataSource::~DataSource() {} AsyncStore::~AsyncStore() {} -/* -AsyncResult::AsyncResult() : - errNo(0), - errMsg() -{} - -AsyncResult::AsyncResult(const int errNo, - const std::string& errMsg) : - errNo(errNo), - errMsg(errMsg) -{} - -void -AsyncResult::destroy() -{ - delete this; -} -*/ - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index c57bdaa552..7e2ee81620 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -24,48 +24,31 @@ // does not allow it. Using a local map<std::string, Variant> definition also precludes forward declaration. #include "qpid/types/Variant.h" // qpid::types::Variant::Map +#include <boost/shared_ptr.hpp> #include <stdint.h> #include <string> namespace qpid { namespace broker { -// Defined by broker, implements qpid::messaging::Handle-type template to hide ref counting -// Subclass this for specific contexts -class BrokerAsyncContext { -public: - virtual ~BrokerAsyncContext(); -}; - -// Callback definition: -//struct AsyncResult { -// int errNo; // 0 implies no error -// std::string errMsg; -// AsyncResult(); -// AsyncResult(const int errNo, -// const std::string& errMsg); -// void destroy(); -//}; -//typedef void (*ResultCallback)(const AsyncResult*, BrokerAsyncContext*); - +// This handle carries async op results class AsyncResultHandle; -class AsyncResultQueue; // Implements the result callback function -// Singleton class in broker which contains return pollable queue. Use submitAsyncResult() to add reulsts to queue. -class AsyncResultHandler { +// Broker to subclass as a pollable queue +class AsyncResultQueue { public: - virtual ~AsyncResultHandler(); - - // Factory method to create result handle - - virtual AsyncResultHandle createAsyncResultHandle(const int errNo, const std::string& errMsg, BrokerAsyncContext*) = 0; - - // Async return interface + virtual ~AsyncResultQueue(); + // TODO: Remove boost::shared_ptr<> from this interface + virtual void submit(boost::shared_ptr<AsyncResultHandle>) = 0; +}; - virtual void submitAsyncResult(AsyncResultHandle&) = 0; +// Subclass this for specific contexts +class BrokerAsyncContext { +public: + virtual ~BrokerAsyncContext(); + virtual AsyncResultQueue* getAsyncResultQueue() const = 0; + virtual void invokeCallback(const AsyncResultHandle* const) const = 0; }; -typedef void (qpid::broker::AsyncResultQueue::*ResultCallback)(AsyncResultHandle*); -//typedef void (qpid::broker::AsyncResultQueue::*ResultCallback)(AsyncResultQueue*, AsyncResultHandle*); class DataSource { public: @@ -74,6 +57,9 @@ public: virtual void write(char* target) = 0; }; +// Callback invoked by AsyncResultQueue to pass back async results +typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); + class ConfigHandle; class EnqueueHandle; class EventHandle; @@ -85,39 +71,39 @@ class TxnHandle; // Subclassed by store: class AsyncStore { public: - AsyncStore(); virtual ~AsyncStore(); - // Factory methods for creating handles + // --- Factory methods for creating handles --- virtual ConfigHandle createConfigHandle() = 0; virtual EnqueueHandle createEnqueueHandle(MessageHandle&, QueueHandle&) = 0; virtual EventHandle createEventHandle(QueueHandle&, const std::string& key=std::string()) = 0; - virtual MessageHandle createMessageHandle(const DataSource*) = 0; + virtual MessageHandle createMessageHandle(const DataSource* const) = 0; virtual QueueHandle createQueueHandle(const std::string& name, const qpid::types::Variant::Map& opts) = 0; - virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0; + virtual TxnHandle createTxnHandle(const std::string& xid=std::string()) = 0; // Distr. txns must supply xid - // Store async interface + // --- Store async interface --- - virtual void submitPrepare(TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; // Distributed txns only - virtual void submitCommit(TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitAbort(TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; + // TODO: Remove boost::shared_ptr<> from this interface + virtual void submitPrepare(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Distributed txns only + virtual void submitCommit(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitAbort(TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitCreate(ConfigHandle&, const DataSource*, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitDestroy(ConfigHandle&, ResultCallback, BrokerAsyncContext*) = 0; + virtual void submitCreate(ConfigHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(ConfigHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitCreate(QueueHandle&, const DataSource*, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitDestroy(QueueHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitFlush(QueueHandle&, ResultCallback, BrokerAsyncContext*) = 0; + virtual void submitCreate(QueueHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(QueueHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitFlush(QueueHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitCreate(EventHandle&, const DataSource*, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitCreate(EventHandle&, const DataSource*, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitDestroy(EventHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitDestroy(EventHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; + virtual void submitCreate(EventHandle&, const DataSource* const, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitCreate(EventHandle&, const DataSource* const, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(EventHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDestroy(EventHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; - virtual void submitEnqueue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitDequeue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; + virtual void submitEnqueue(EnqueueHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; + virtual void submitDequeue(EnqueueHandle&, TxnHandle&, boost::shared_ptr<BrokerAsyncContext>) = 0; // Legacy - Restore FTD message, is NOT async! virtual int loadContent(MessageHandle&, QueueHandle&, char* data, uint64_t offset, const uint64_t length) = 0; diff --git a/cpp/src/qpid/broker/ConfigHandle.h b/cpp/src/qpid/broker/ConfigHandle.h index e2cdca6f15..67009bf57a 100644 --- a/cpp/src/qpid/broker/ConfigHandle.h +++ b/cpp/src/qpid/broker/ConfigHandle.h @@ -44,8 +44,6 @@ public: // <none> private: - typedef qpid::asyncStore::ConfigHandleImpl Impl; - Impl* impl; friend class qpid::messaging::PrivateImplRef<ConfigHandle>; }; diff --git a/cpp/src/qpid/broker/EnqueueHandle.h b/cpp/src/qpid/broker/EnqueueHandle.h index 3ab6885497..6053d1879c 100644 --- a/cpp/src/qpid/broker/EnqueueHandle.h +++ b/cpp/src/qpid/broker/EnqueueHandle.h @@ -44,8 +44,6 @@ public: // <none> private: - typedef qpid::asyncStore::EnqueueHandleImpl Impl; - Impl* impl; friend class qpid::messaging::PrivateImplRef<EnqueueHandle>; }; diff --git a/cpp/src/qpid/broker/EventHandle.h b/cpp/src/qpid/broker/EventHandle.h index 355cb3a091..8ded98be4a 100644 --- a/cpp/src/qpid/broker/EventHandle.h +++ b/cpp/src/qpid/broker/EventHandle.h @@ -44,8 +44,6 @@ public: const std::string& getKey() const; private: - typedef qpid::asyncStore::EventHandleImpl Impl; - Impl* impl; friend class qpid::messaging::PrivateImplRef<EventHandle>; }; diff --git a/cpp/src/qpid/broker/MessageHandle.h b/cpp/src/qpid/broker/MessageHandle.h index 9339d81f32..739c53f7d3 100644 --- a/cpp/src/qpid/broker/MessageHandle.h +++ b/cpp/src/qpid/broker/MessageHandle.h @@ -45,8 +45,6 @@ public: // <none> private: - //typedef qpid::asyncStore::MessageHandleImpl Impl; - //Impl* impl; friend class qpid::messaging::PrivateImplRef<MessageHandle>; }; diff --git a/cpp/src/qpid/broker/QueueHandle.h b/cpp/src/qpid/broker/QueueHandle.h index a8caa03f97..cb366e2880 100644 --- a/cpp/src/qpid/broker/QueueHandle.h +++ b/cpp/src/qpid/broker/QueueHandle.h @@ -44,8 +44,6 @@ public: const std::string& getName() const; private: - typedef qpid::asyncStore::QueueHandleImpl Impl; - Impl* impl; friend class qpid::messaging::PrivateImplRef<QueueHandle>; }; diff --git a/cpp/src/qpid/broker/TxnHandle.h b/cpp/src/qpid/broker/TxnHandle.h index 814b4ea0b3..5981b89026 100644 --- a/cpp/src/qpid/broker/TxnHandle.h +++ b/cpp/src/qpid/broker/TxnHandle.h @@ -45,8 +45,6 @@ public: bool is2pc() const; private: - typedef qpid::asyncStore::TxnHandleImpl Impl; - Impl* impl; friend class qpid::messaging::PrivateImplRef<TxnHandle>; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 6042291a0a..89b9b5b9b5 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -46,6 +46,7 @@ MessageConsumer::~MessageConsumer() void* MessageConsumer::runConsumers() { +/* uint32_t numMsgs = 0; while (numMsgs < m_perfTestParams.m_numMsgs) { if (m_queue->dispatch()) { @@ -54,6 +55,7 @@ MessageConsumer::runConsumers() ::usleep(1000); // TODO - replace this poller with condition variable } } +*/ return 0; } diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp index 49d656aee4..1106069560 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp @@ -30,7 +30,7 @@ #include "QueuedMessage.h" #include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/AsyncResultQueue.h" +#include "qpid/broker/AsyncResultHandle.h" namespace tests { namespace storePerftools { @@ -39,11 +39,11 @@ namespace asyncPerf { MockPersistableQueue::MockPersistableQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, qpid::asyncStore::AsyncStoreImpl* store, - qpid::broker::AsyncResultQueue& resultQueue) : + qpid::broker::AsyncResultQueue& arq) : qpid::broker::PersistableQueue(), m_name(name), m_store(store), - m_resultQueue(resultQueue), + m_resultQueue(arq), m_asyncOpCounter(0UL), m_persistenceId(0ULL), m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. @@ -67,18 +67,17 @@ MockPersistableQueue::~MockPersistableQueue() } // static -/* void -MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerAsyncContext* bc) +MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh) { - if (bc && res) { - QueueAsyncContext* qc = dynamic_cast<QueueAsyncContext*>(bc); - if (res->errNo) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { // TODO: Handle async failure here (other than by simply printing a message) std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << res->errNo << " (" << res->errMsg << ")" << std::endl; + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; } else { +//std::cout << "QQQ MockPersistableQueue::handleAsyncResult() op=" << qc->getOpStr() << std::endl << std::flush; // Handle async success here switch(qc->getOpCode()) { case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: @@ -103,10 +102,7 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, }; } } - if (bc) delete bc; - if (res) delete res; } -*/ const qpid::broker::QueueHandle& MockPersistableQueue::getHandle() const @@ -129,14 +125,14 @@ MockPersistableQueue::getStore() void MockPersistableQueue::asyncCreate() { - qpid::broker::ResultCallback rcb = &qpid::broker::AsyncResultQueue::submit; if (m_store) { + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + qpid::asyncStore::AsyncOperation::QUEUE_CREATE, + &handleAsyncResult, + &m_resultQueue)); m_store->submitCreate(m_queueHandle, this, - rcb, -// &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/, - new QueueAsyncContext(shared_from_this(), - qpid::asyncStore::AsyncOperation::QUEUE_CREATE)); + qac); ++m_asyncOpCounter; } } @@ -147,10 +143,12 @@ MockPersistableQueue::asyncDestroy(const bool deleteQueue) m_destroyPending = true; if (m_store) { if (deleteQueue) { + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + qpid::asyncStore::AsyncOperation::QUEUE_DESTROY, + &handleAsyncResult, + &m_resultQueue)); m_store->submitDestroy(m_queueHandle, - &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/, - new QueueAsyncContext(shared_from_this(), - qpid::asyncStore::AsyncOperation::QUEUE_DESTROY)); + qac); ++m_asyncOpCounter; } m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); @@ -162,6 +160,7 @@ MockPersistableQueue::deliver(boost::shared_ptr<MockPersistableMessage> msg) { QueuedMessage qm(this, msg); if(enqueue((MockTransactionContext*)0, qm)) { + // TODO: Do we need to wait for the enqueue to complete before push()ing the msg? push(qm); } } @@ -334,12 +333,14 @@ MockPersistableQueue::asyncEnqueue(MockTransactionContext* txn, { qm.payload()->setPersistenceId(m_store->getNextRid()); //std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; - m_store->submitEnqueue(/*enqHandle*/qm.enqHandle(), + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + qm.payload(), + qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, + &handleAsyncResult, + &m_resultQueue)); + m_store->submitEnqueue(qm.enqHandle(), txn->getHandle(), - &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/, - new QueueAsyncContext(shared_from_this(), - qm.payload(), - qpid::asyncStore::AsyncOperation::MSG_ENQUEUE)); + qac); ++m_asyncOpCounter; return true; } @@ -350,13 +351,14 @@ MockPersistableQueue::asyncDequeue(MockTransactionContext* txn, QueuedMessage& qm) { //std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; - qpid::broker::EnqueueHandle enqHandle = qm.enqHandle(); - m_store->submitDequeue(enqHandle, + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + qm.payload(), + qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, + &handleAsyncResult, + &m_resultQueue)); + m_store->submitDequeue(qm.enqHandle(), txn->getHandle(), - &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/, - new QueueAsyncContext(shared_from_this(), - qm.payload(), - qpid::asyncStore::AsyncOperation::MSG_DEQUEUE)); + qac); ++m_asyncOpCounter; return true; } @@ -374,7 +376,7 @@ MockPersistableQueue::destroyCheck(const std::string& opDescr) const // protected void -MockPersistableQueue::createComplete(const QueueAsyncContext* qc) +MockPersistableQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc) { //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": createComplete()" << std::endl << std::flush; assert(qc->getQueue().get() == this); @@ -383,7 +385,7 @@ MockPersistableQueue::createComplete(const QueueAsyncContext* qc) // protected void -MockPersistableQueue::flushComplete(const QueueAsyncContext* qc) +MockPersistableQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc) { //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": flushComplete()" << std::endl << std::flush; assert(qc->getQueue().get() == this); @@ -392,7 +394,7 @@ MockPersistableQueue::flushComplete(const QueueAsyncContext* qc) // protected void -MockPersistableQueue::destroyComplete(const QueueAsyncContext* qc) +MockPersistableQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc) { //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": destroyComplete()" << std::endl << std::flush; assert(qc->getQueue().get() == this); @@ -401,7 +403,7 @@ MockPersistableQueue::destroyComplete(const QueueAsyncContext* qc) } void -MockPersistableQueue::enqueueComplete(const QueueAsyncContext* qc) +MockPersistableQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) { //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; assert(qc->getQueue().get() == this); @@ -409,7 +411,7 @@ MockPersistableQueue::enqueueComplete(const QueueAsyncContext* qc) } void -MockPersistableQueue::dequeueComplete(const QueueAsyncContext* qc) +MockPersistableQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) { //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; assert(qc->getQueue().get() == this); diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h index e62aeec420..bb68015b95 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h @@ -64,11 +64,10 @@ public: MockPersistableQueue(const std::string& name, const qpid::framing::FieldTable& args, qpid::asyncStore::AsyncStoreImpl* store, - qpid::broker::AsyncResultQueue& rq); + qpid::broker::AsyncResultQueue& arq); virtual ~MockPersistableQueue(); -// static void handleAsyncResult(const qpid::broker::AsyncResult* res, -// qpid::broker::BrokerAsyncContext* bc); + static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res); const qpid::broker::QueueHandle& getHandle() const; qpid::broker::QueueHandle& getHandle(); qpid::asyncStore::AsyncStoreImpl* getStore(); @@ -143,11 +142,11 @@ protected: void destroyCheck(const std::string& opDescr) const; // --- Async op completions (called through handleAsyncResult) --- - void createComplete(const QueueAsyncContext* qc); - void flushComplete(const QueueAsyncContext* qc); - void destroyComplete(const QueueAsyncContext* qc); - void enqueueComplete(const QueueAsyncContext* qc); - void dequeueComplete(const QueueAsyncContext* qc); + void createComplete(const boost::shared_ptr<QueueAsyncContext> qc); + void flushComplete(const boost::shared_ptr<QueueAsyncContext> qc); + void destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc); + void enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc); + void dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc); }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index 66e0bb3dbf..09518c7c61 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -32,7 +32,6 @@ #include "tests/storePerftools/common/Thread.h" #include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/AsyncResultQueue.h" #include "qpid/sys/Poller.h" #include <iomanip> @@ -70,7 +69,7 @@ PerfTest::~PerfTest() void PerfTest::prepareStore() { - m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts, &m_resultQueue); + m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts); m_store->initialize(); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h index 46455e4af0..651d97f9fc 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h @@ -28,7 +28,7 @@ #include "tests/storePerftools/common/Streamable.h" -#include "qpid/broker/AsyncResultQueue.h" +#include "qpid/broker/AsyncResultQueueImpl.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Thread.h" @@ -70,7 +70,7 @@ protected: const char* m_msgData; boost::shared_ptr<qpid::sys::Poller> m_poller; qpid::sys::Thread m_pollingThread; - qpid::broker::AsyncResultQueue m_resultQueue; + qpid::broker::AsyncResultQueueImpl m_resultQueue; qpid::asyncStore::AsyncStoreImpl* m_store; std::deque<boost::shared_ptr<MockPersistableQueue> > m_queueList; std::deque<boost::shared_ptr<MessageProducer> > m_producers; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp index be0c087390..513175ab41 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp @@ -30,19 +30,27 @@ namespace storePerftools { namespace asyncPerf { QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q, - const qpid::asyncStore::AsyncOperation::opCode op) : + const qpid::asyncStore::AsyncOperation::opCode op, + qpid::broker::AsyncResultCallback rcb, + qpid::broker::AsyncResultQueue* const arq) : m_q(q), - m_op(op) + m_op(op), + m_rcb(rcb), + m_arq(arq) { assert(m_q.get() != 0); } QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q, boost::shared_ptr<MockPersistableMessage> msg, - const qpid::asyncStore::AsyncOperation::opCode op) : + const qpid::asyncStore::AsyncOperation::opCode op, + qpid::broker::AsyncResultCallback rcb, + qpid::broker::AsyncResultQueue* const arq) : m_q(q), m_msg(msg), - m_op(op) + m_op(op), + m_rcb(rcb), + m_arq(arq) { assert(m_q.get() != 0); assert(m_msg.get() != 0); @@ -75,6 +83,24 @@ QueueAsyncContext::getMessage() const return m_msg; } +qpid::broker::AsyncResultQueue* +QueueAsyncContext::getAsyncResultQueue() const +{ + return m_arq; +} + +qpid::broker::AsyncResultCallback +QueueAsyncContext::getAsyncResultCallback() const +{ + return m_rcb; +} + +void +QueueAsyncContext::invokeCallback(const qpid::broker::AsyncResultHandle* const arh) const +{ + m_rcb(arh); +} + void QueueAsyncContext::destroy() { diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h index 2b6b3778cd..ab68b6b39f 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h @@ -29,6 +29,8 @@ #include <boost/shared_ptr.hpp> + + namespace tests { namespace storePerftools { namespace asyncPerf { @@ -40,21 +42,30 @@ class QueueAsyncContext: public qpid::broker::BrokerAsyncContext { public: QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q, - const qpid::asyncStore::AsyncOperation::opCode op); + const qpid::asyncStore::AsyncOperation::opCode op, + qpid::broker::AsyncResultCallback rcb, + qpid::broker::AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q, boost::shared_ptr<MockPersistableMessage> msg, - const qpid::asyncStore::AsyncOperation::opCode op); + const qpid::asyncStore::AsyncOperation::opCode op, + qpid::broker::AsyncResultCallback rcb, + qpid::broker::AsyncResultQueue* const arq); virtual ~QueueAsyncContext(); qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; boost::shared_ptr<MockPersistableQueue> getQueue() const; boost::shared_ptr<MockPersistableMessage> getMessage() const; + qpid::broker::AsyncResultQueue* getAsyncResultQueue() const; + qpid::broker::AsyncResultCallback getAsyncResultCallback() const; + void invokeCallback(const qpid::broker::AsyncResultHandle* const arh) const; void destroy(); protected: boost::shared_ptr<MockPersistableQueue> m_q; boost::shared_ptr<MockPersistableMessage> m_msg; const qpid::asyncStore::AsyncOperation::opCode m_op; + qpid::broker::AsyncResultCallback m_rcb; + qpid::broker::AsyncResultQueue* const m_arq; }; }}} // namespace tests::storePerftools::asyncPerf |