diff options
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp')
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp | 133 |
1 files changed, 71 insertions, 62 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index 2a6f2b208b..79b8b46919 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -33,7 +33,6 @@ #include "qpid/broker/AsyncResultHandle.h" #include "qpid/broker/TxnHandle.h" -#include <boost/make_shared.hpp> #include <string.h> // memcpy() namespace tests { @@ -69,43 +68,6 @@ SimpleQueue::SimpleQueue(const std::string& name, SimpleQueue::~SimpleQueue() {} -// static -void -SimpleQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh) -{ - if (arh) { - boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); - if (arh->getErrNo()) { - // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; - } else { - // Handle async success here - switch(qc->getOpCode()) { - case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: - qc->getQueue()->createComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH: - qc->getQueue()->flushComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: - qc->getQueue()->destroyComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: - qc->getQueue()->enqueueComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: - qc->getQueue()->dequeueComplete(qc); - break; - default: - std::ostringstream oss; - oss << "tests::storePerftools::asyncPerf::SimpleQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode(); - throw qpid::Exception(oss.str()); - }; - } - } -} - const qpid::broker::QueueHandle& SimpleQueue::getHandle() const { @@ -130,16 +92,28 @@ SimpleQueue::asyncCreate() if (m_store) { boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), s_nullTxnHandle, - qpid::asyncStore::AsyncOperation::QUEUE_CREATE, - &handleAsyncResult, + &handleAsyncCreateResult, &m_resultQueue)); - m_store->submitCreate(m_queueHandle, - this, - qac); + m_store->submitCreate(m_queueHandle, this, qac); ++m_asyncOpCounter; } } +//static +void +SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + qc->getQueue()->createComplete(qc); + } + } +} + void SimpleQueue::asyncDestroy(const bool deleteQueue) { @@ -148,25 +122,38 @@ SimpleQueue::asyncDestroy(const bool deleteQueue) if (deleteQueue) { boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), s_nullTxnHandle, - qpid::asyncStore::AsyncOperation::QUEUE_DESTROY, - &handleAsyncResult, + &handleAsyncDestroyResult, &m_resultQueue)); - m_store->submitDestroy(m_queueHandle, - qac); + m_store->submitDestroy(m_queueHandle, qac); ++m_asyncOpCounter; } m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); } } +//static +void +SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + qc->getQueue()->destroyComplete(qc); + } + } +} + void SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) { boost::shared_ptr<QueuedMessage> qm; if (msg->isPersistent() && m_store) { - qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); + qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); } else { - qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg)); + qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)); } enqueue(s_nullTxnHandle, qm); push(qm); @@ -231,9 +218,9 @@ SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) { boost::shared_ptr<QueuedMessage> qm; if (msg->isPersistent() && m_store) { - qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); + qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); } else { - qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg)); + qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)); } push(qm); } @@ -357,9 +344,6 @@ void SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm, bool /*isRecovery*/) { -boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); -assert(pqm.get()); - m_messages->push(qm); } @@ -375,8 +359,7 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), pqm->payload(), th, - qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, - &handleAsyncResult, + &handleAsyncEnqueueResult, &m_resultQueue)); // TODO : This must be done from inside store, not here if (th.isValid()) { @@ -389,6 +372,21 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, return true; } +// private static +void +SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + qc->getQueue()->enqueueComplete(qc); + } + } +} + // private bool SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, @@ -398,8 +396,7 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), pqm->payload(), th, - qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, - &handleAsyncResult, + &handleAsyncDequeueResult, &m_resultQueue)); // TODO : This must be done from inside store, not here if (th.isValid()) { @@ -411,6 +408,20 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, ++m_asyncOpCounter; return true; } +// private static +void +SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + qc->getQueue()->dequeueComplete(qc); + } + } +} // private void @@ -455,9 +466,8 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) assert(qc->getQueue().get() == this); --m_asyncOpCounter; - qpid::broker::TxnHandle th = qc->getTxnHandle(); - // TODO : This must be done from inside store, not here + qpid::broker::TxnHandle th = qc->getTxnHandle(); if (th.isValid()) { // transactional enqueue th.decrOpCnt(); } @@ -470,9 +480,8 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) assert(qc->getQueue().get() == this); --m_asyncOpCounter; - qpid::broker::TxnHandle th = qc->getTxnHandle(); - // TODO : This must be done from inside store, not here + qpid::broker::TxnHandle th = qc->getTxnHandle(); if (th.isValid()) { // transactional enqueue th.decrOpCnt(); } |