summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/asyncStore/OperationQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/asyncStore/OperationQueue.cpp')
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp29
1 files changed, 12 insertions, 17 deletions
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index f13114f41e..a455e445ab 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -28,10 +28,8 @@
namespace qpid {
namespace asyncStore {
-OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller,
- qpid::broker::AsyncResultQueue* resultQueue) :
- m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller),
- m_resultQueue(resultQueue)
+OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) :
+ m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller)
{
m_opQueue.start();
}
@@ -42,9 +40,9 @@ OperationQueue::~OperationQueue()
}
void
-OperationQueue::submit(const AsyncOperation* op)
+OperationQueue::submit(boost::shared_ptr<const AsyncOperation> op)
{
-std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
+//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
m_opQueue.push(op);
}
@@ -53,19 +51,16 @@ OperationQueue::OpQueue::Batch::const_iterator
OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
{
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
-std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
- qpid::broker::BrokerAsyncContext* bc = (*i)->m_brokerCtxt;
- qpid::broker::ResultCallback rcb = (*i)->m_resCb;
- if (rcb) {
-// ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt);
-// rcb(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc)));
- if (m_resultQueue) {
- (m_resultQueue->*rcb)(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc)));
+//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
+ boost::shared_ptr<qpid::broker::BrokerAsyncContext> bc = (*i)->m_brokerCtxt;
+ if (bc) {
+ qpid::broker::AsyncResultQueue* const arq = bc->getAsyncResultQueue();
+ if (arq) {
+ qpid::broker::AsyncResultHandleImpl* arhi = new qpid::broker::AsyncResultHandleImpl(bc);
+ boost::shared_ptr<qpid::broker::AsyncResultHandle> arh(new qpid::broker::AsyncResultHandle(arhi));
+ arq->submit(arh);
}
- } else {
- delete bc;
}
- delete (*i);
}
return e.end();
}