diff options
Diffstat (limited to 'cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp')
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 199 |
1 files changed, 84 insertions, 115 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 6283d07ee9..a9fc13363a 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -38,12 +38,11 @@ namespace qpid { namespace asyncStore { AsyncStoreImpl::AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, - const AsyncStoreOptions& opts, - qpid::broker::AsyncResultQueue* resultQueue) : + const AsyncStoreOptions& opts) : m_poller(poller), m_opts(opts), m_runState(), - m_operations(m_poller, resultQueue) + m_operations(m_poller) {} AsyncStoreImpl::~AsyncStoreImpl() @@ -63,23 +62,17 @@ void AsyncStoreImpl::initManagement(qpid::broker::Broker* /*broker*/) {} -qpid::broker::TxnHandle -AsyncStoreImpl::createTxnHandle(const std::string& xid) -{ - return qpid::broker::TxnHandle(new TxnHandleImpl(xid)); -} - qpid::broker::ConfigHandle AsyncStoreImpl::createConfigHandle() { return qpid::broker::ConfigHandle(new ConfigHandleImpl()); } -qpid::broker::QueueHandle -AsyncStoreImpl::createQueueHandle(const std::string& name, - const qpid::types::Variant::Map& opts) +qpid::broker::EnqueueHandle +AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, + qpid::broker::QueueHandle& queueHandle) { - return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts)); + return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle)); } qpid::broker::EventHandle @@ -90,202 +83,178 @@ AsyncStoreImpl::createEventHandle(qpid::broker::QueueHandle& queueHandle, } qpid::broker::MessageHandle -AsyncStoreImpl::createMessageHandle(const qpid::broker::DataSource* dataSrc) +AsyncStoreImpl::createMessageHandle(const qpid::broker::DataSource* const dataSrc) { return qpid::broker::MessageHandle(new MessageHandleImpl(dataSrc)); } -qpid::broker::EnqueueHandle -AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, - qpid::broker::QueueHandle& queueHandle) +qpid::broker::QueueHandle +AsyncStoreImpl::createQueueHandle(const std::string& name, + const qpid::types::Variant::Map& opts) { - return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle)); + return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts)); +} + +qpid::broker::TxnHandle +AsyncStoreImpl::createTxnHandle(const std::string& xid) +{ + return qpid::broker::TxnHandle(new TxnHandleImpl(xid)); } void AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_PREPARE, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE, + dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_COMMIT, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT, + dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_ABORT, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT, + dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, - const qpid::broker::DataSource* dataSrc, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + const qpid::broker::DataSource* const dataSrc, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::CONFIG_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), - dataSrc, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE, + dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), + dataSrc, + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::CONFIG_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY, + dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, - const qpid::broker::DataSource* dataSrc, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + const qpid::broker::DataSource* const dataSrc, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), - dataSrc, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE, + dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), + dataSrc, + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY, + dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_FLUSH, + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH, dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), - resultCb, - brokerCtxt); + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, - const qpid::broker::DataSource* dataSrc, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + const qpid::broker::DataSource* const dataSrc, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - dataSrc, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE, + dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + dataSrc, + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, - const qpid::broker::DataSource* dataSrc, + const qpid::broker::DataSource* const dataSrc, qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - dataSrc, - &txnHandle, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE, + dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + dataSrc, + &txnHandle, + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY, + dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - &txnHandle, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY, + dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + &txnHandle, + brokerCtxt)); m_operations.submit(op); } void AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), - &txnHandle, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE, + dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + &txnHandle, + brokerCtxt)); m_operations.submit(op); -//delete op; -//delete brokerCtxt; } void AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { - AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), - &txnHandle, - resultCb, - brokerCtxt); + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE, + dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + &txnHandle, + brokerCtxt)); m_operations.submit(op); } |