summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/asyncStore/OperationQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/asyncStore/OperationQueue.cpp')
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp24
1 files changed, 17 insertions, 7 deletions
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index 69ddf7645e..f13114f41e 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -23,11 +23,15 @@
#include "OperationQueue.h"
+#include "qpid/broker/AsyncResultHandle.h"
+
namespace qpid {
namespace asyncStore {
-OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller) :
- m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller)
+OperationQueue::OperationQueue(const boost::shared_ptr<qpid::sys::Poller>& poller,
+ qpid::broker::AsyncResultQueue* resultQueue) :
+ m_opQueue(boost::bind(&OperationQueue::handle, this, _1), poller),
+ m_resultQueue(resultQueue)
{
m_opQueue.start();
}
@@ -40,7 +44,7 @@ OperationQueue::~OperationQueue()
void
OperationQueue::submit(const AsyncOperation* op)
{
-//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
+std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
m_opQueue.push(op);
}
@@ -49,11 +53,17 @@ OperationQueue::OpQueue::Batch::const_iterator
OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
{
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
-//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
- if ((*i)->m_resCb) {
- ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt);
+std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
+ qpid::broker::BrokerAsyncContext* bc = (*i)->m_brokerCtxt;
+ qpid::broker::ResultCallback rcb = (*i)->m_resCb;
+ if (rcb) {
+// ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt);
+// rcb(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc)));
+ if (m_resultQueue) {
+ (m_resultQueue->*rcb)(new qpid::broker::AsyncResultHandle(new qpid::broker::AsyncResultHandleImpl(bc)));
+ }
} else {
- delete (*i)->m_brokerCtxt;
+ delete bc;
}
delete (*i);
}