diff options
Diffstat (limited to 'cpp/src/qpid/asyncStore/OperationQueue.cpp')
-rw-r--r-- | cpp/src/qpid/asyncStore/OperationQueue.cpp | 24 |
1 files changed, 17 insertions, 7 deletions
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index 69ddf7645e..f13114f41e 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -23,11 +23,15 @@ #include "OperationQueue.h" +#include "qpid/broker/AsyncResultHandle.h" + namespace qpid { namespace asyncStore { -OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) : - m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller) +OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller, + qpid::broker::AsyncResultQueue* resultQueue) : + m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller), + m_resultQueue(resultQueue) { m_opQueue.start(); } @@ -40,7 +44,7 @@ OperationQueue::~OperationQueue() void OperationQueue::submit(const AsyncOperation* op) { -//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; +std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; m_opQueue.push(op); } @@ -49,11 +53,17 @@ OperationQueue::OpQueue::Batch::const_iterator OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { -//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; - if ((*i)->m_resCb) { - ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt); +std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; + qpid::broker::BrokerAsyncContext* bc = (*i)->m_brokerCtxt; + qpid::broker::ResultCallback rcb = (*i)->m_resCb; + if (rcb) { +// ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt); +// rcb(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc))); + if (m_resultQueue) { + (m_resultQueue->*rcb)(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc))); + } } else { - delete (*i)->m_brokerCtxt; + delete bc; } delete (*i); } |