diff options
Diffstat (limited to 'cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp')
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 114 |
1 files changed, 48 insertions, 66 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 9135fcc27e..6b5c3ac582 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -23,7 +23,11 @@ #include "AsyncStoreImpl.h" -#include "AsyncOperation.h" +#include "ConfigHandleImpl.h" +#include "EnqueueHandleImpl.h" +#include "EventHandleImpl.h" +#include "MessageHandleImpl.h" +#include "QueueHandleImpl.h" #include "TxnHandleImpl.h" #include "qpid/broker/ConfigHandle.h" @@ -33,8 +37,6 @@ #include "qpid/broker/QueueHandle.h" #include "qpid/broker/TxnHandle.h" -#include <boost/intrusive_ptr.hpp> - namespace qpid { namespace asyncStore { @@ -88,6 +90,36 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid, return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tb)); } +void +AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) +{ + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE, + dynamic_cast<AsyncStoreHandle*>(&txnHandle), + brokerCtxt)); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) +{ + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT, + dynamic_cast<AsyncStoreHandle*>(&txnHandle), + brokerCtxt)); + m_operations.submit(op); +} + +void +AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, + boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) +{ + boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT, + dynamic_cast<AsyncStoreHandle*>(&txnHandle), + brokerCtxt)); + m_operations.submit(op); +} + qpid::broker::ConfigHandle AsyncStoreImpl::createConfigHandle() { @@ -98,14 +130,16 @@ qpid::broker::EnqueueHandle AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle, qpid::broker::QueueHandle& queueHandle) { - return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle)); + return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, + queueHandle)); } qpid::broker::EventHandle AsyncStoreImpl::createEventHandle(qpid::broker::QueueHandle& queueHandle, const std::string& key) { - return qpid::broker::EventHandle(new EventHandleImpl(queueHandle, key)); + return qpid::broker::EventHandle(new EventHandleImpl(queueHandle, + key)); } qpid::broker::MessageHandle @@ -123,42 +157,12 @@ AsyncStoreImpl::createQueueHandle(const std::string& name, } void -AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT, - dynamic_cast<qpid::broker::IdHandle*>(&txnHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle, const qpid::broker::DataSource* const dataSrc, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), + dynamic_cast<AsyncStoreHandle*>(&cfgHandle), dataSrc, brokerCtxt)); m_operations.submit(op); @@ -169,7 +173,7 @@ AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle), + dynamic_cast<AsyncStoreHandle*>(&cfgHandle), brokerCtxt)); m_operations.submit(op); } @@ -180,7 +184,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), + dynamic_cast<AsyncStoreHandle*>(&queueHandle), dataSrc, brokerCtxt)); m_operations.submit(op); @@ -191,7 +195,7 @@ AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), + dynamic_cast<AsyncStoreHandle*>(&queueHandle), brokerCtxt)); m_operations.submit(op); } @@ -201,19 +205,7 @@ AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH, - dynamic_cast<qpid::broker::IdHandle*>(&queueHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, - const qpid::broker::DataSource* const dataSrc, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - dataSrc, + dynamic_cast<AsyncStoreHandle*>(&queueHandle), brokerCtxt)); m_operations.submit(op); } @@ -225,7 +217,7 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + dynamic_cast<AsyncStoreHandle*>(&eventHandle), dataSrc, &txnHandle, brokerCtxt)); @@ -234,21 +226,11 @@ AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle, void AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, - boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) -{ - boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), - brokerCtxt)); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY, - dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), + dynamic_cast<AsyncStoreHandle*>(&eventHandle), &txnHandle, brokerCtxt)); m_operations.submit(op); @@ -260,7 +242,7 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + dynamic_cast<AsyncStoreHandle*>(&enqHandle), &txnHandle, brokerCtxt)); m_operations.submit(op); @@ -272,7 +254,7 @@ AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt) { boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), + dynamic_cast<AsyncStoreHandle*>(&enqHandle), &txnHandle, brokerCtxt)); m_operations.submit(op); |