summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp')
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp199
1 files changed, 84 insertions, 115 deletions
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index 6283d07ee9..a9fc13363a 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -38,12 +38,11 @@ namespace qpid {
namespace asyncStore {
AsyncStoreImpl::AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
- const AsyncStoreOptions& opts,
- qpid::broker::AsyncResultQueue* resultQueue) :
+ const AsyncStoreOptions& opts) :
m_poller(poller),
m_opts(opts),
m_runState(),
- m_operations(m_poller, resultQueue)
+ m_operations(m_poller)
{}
AsyncStoreImpl::~AsyncStoreImpl()
@@ -63,23 +62,17 @@ void
AsyncStoreImpl::initManagement(qpid::broker::Broker* /*broker*/)
{}
-qpid::broker::TxnHandle
-AsyncStoreImpl::createTxnHandle(const std::string& xid)
-{
- return qpid::broker::TxnHandle(new TxnHandleImpl(xid));
-}
-
qpid::broker::ConfigHandle
AsyncStoreImpl::createConfigHandle()
{
return qpid::broker::ConfigHandle(new ConfigHandleImpl());
}
-qpid::broker::QueueHandle
-AsyncStoreImpl::createQueueHandle(const std::string& name,
- const qpid::types::Variant::Map& opts)
+qpid::broker::EnqueueHandle
+AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle,
+ qpid::broker::QueueHandle& queueHandle)
{
- return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts));
+ return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle));
}
qpid::broker::EventHandle
@@ -90,202 +83,178 @@ AsyncStoreImpl::createEventHandle(qpid::broker::QueueHandle& queueHandle,
}
qpid::broker::MessageHandle
-AsyncStoreImpl::createMessageHandle(const qpid::broker::DataSource* dataSrc)
+AsyncStoreImpl::createMessageHandle(const qpid::broker::DataSource* const dataSrc)
{
return qpid::broker::MessageHandle(new MessageHandleImpl(dataSrc));
}
-qpid::broker::EnqueueHandle
-AsyncStoreImpl::createEnqueueHandle(qpid::broker::MessageHandle& msgHandle,
- qpid::broker::QueueHandle& queueHandle)
+qpid::broker::QueueHandle
+AsyncStoreImpl::createQueueHandle(const std::string& name,
+ const qpid::types::Variant::Map& opts)
{
- return qpid::broker::EnqueueHandle(new EnqueueHandleImpl(msgHandle, queueHandle));
+ return qpid::broker::QueueHandle(new QueueHandleImpl(name, opts));
+}
+
+qpid::broker::TxnHandle
+AsyncStoreImpl::createTxnHandle(const std::string& xid)
+{
+ return qpid::broker::TxnHandle(new TxnHandleImpl(xid));
}
void
AsyncStoreImpl::submitPrepare(qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_PREPARE,
- dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_PREPARE,
+ dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitCommit(qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_COMMIT,
- dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_COMMIT,
+ dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitAbort(qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::TXN_ABORT,
- dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::TXN_ABORT,
+ dynamic_cast<qpid::broker::IdHandle*>(&txnHandle),
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitCreate(qpid::broker::ConfigHandle& cfgHandle,
- const qpid::broker::DataSource* dataSrc,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ const qpid::broker::DataSource* const dataSrc,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::CONFIG_CREATE,
- dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle),
- dataSrc,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_CREATE,
+ dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle),
+ dataSrc,
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDestroy(qpid::broker::ConfigHandle& cfgHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::CONFIG_DESTROY,
- dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle),
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::CONFIG_DESTROY,
+ dynamic_cast<qpid::broker::IdHandle*>(&cfgHandle),
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitCreate(qpid::broker::QueueHandle& queueHandle,
- const qpid::broker::DataSource* dataSrc,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ const qpid::broker::DataSource* const dataSrc,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_CREATE,
- dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
- dataSrc,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_CREATE,
+ dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
+ dataSrc,
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDestroy(qpid::broker::QueueHandle& queueHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_DESTROY,
- dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_DESTROY,
+ dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitFlush(qpid::broker::QueueHandle& queueHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::QUEUE_FLUSH,
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::QUEUE_FLUSH,
dynamic_cast<qpid::broker::IdHandle*>(&queueHandle),
- resultCb,
- brokerCtxt);
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
- const qpid::broker::DataSource* dataSrc,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ const qpid::broker::DataSource* const dataSrc,
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_CREATE,
- dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
- dataSrc,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE,
+ dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ dataSrc,
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitCreate(qpid::broker::EventHandle& eventHandle,
- const qpid::broker::DataSource* dataSrc,
+ const qpid::broker::DataSource* const dataSrc,
qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_CREATE,
- dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
- dataSrc,
- &txnHandle,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_CREATE,
+ dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ dataSrc,
+ &txnHandle,
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY,
- dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY,
+ dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY,
- dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
- &txnHandle,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::EVENT_DESTROY,
+ dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
+ &txnHandle,
+ brokerCtxt));
m_operations.submit(op);
}
void
AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE,
- dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
- &txnHandle,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_ENQUEUE,
+ dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
+ &txnHandle,
+ brokerCtxt));
m_operations.submit(op);
-//delete op;
-//delete brokerCtxt;
}
void
AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> brokerCtxt)
{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE,
- dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
- &txnHandle,
- resultCb,
- brokerCtxt);
+ boost::shared_ptr<const AsyncOperation> op(new AsyncOperation(AsyncOperation::MSG_DEQUEUE,
+ dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
+ &txnHandle,
+ brokerCtxt));
m_operations.submit(op);
}