diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-07-20 13:50:48 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-07-20 13:50:48 +0000 |
commit | dd79efca55e3ff0a0e0c25e395967ce9b3f80482 (patch) | |
tree | e10963e1718a58e3ee4cfaf41cd08bc6856d3b63 /cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp | |
parent | 2e437e1569009d8e8ed3ed896d751994e2e85d74 (diff) | |
download | qpid-python-dd79efca55e3ff0a0e0c25e395967ce9b3f80482.tar.gz |
QPID-3858: WIP: Moved QueueAsycnContext from namespace tests::storePerftools::asyncPerf to qpid::broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1363776 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp')
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp | 92 |
1 files changed, 49 insertions, 43 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index 79b8b46919..292fc35925 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -27,10 +27,10 @@ #include "MessageConsumer.h" #include "MessageDeque.h" #include "PersistableQueuedMessage.h" -#include "QueueAsyncContext.h" #include "SimpleMessage.h" #include "qpid/broker/AsyncResultHandle.h" +#include "qpid/broker/QueueAsyncContext.h" #include "qpid/broker/TxnHandle.h" #include <string.h> // memcpy() @@ -90,10 +90,10 @@ void SimpleQueue::asyncCreate() { if (m_store) { - boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), - s_nullTxnHandle, - &handleAsyncCreateResult, - &m_resultQueue)); + boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), + s_nullTxnHandle, + &handleAsyncCreateResult, + &m_resultQueue)); m_store->submitCreate(m_queueHandle, this, qac); ++m_asyncOpCounter; } @@ -103,13 +103,15 @@ SimpleQueue::asyncCreate() void SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh) { if (arh) { - boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> qc = + boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); if (arh->getErrNo()) { // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; } else { - qc->getQueue()->createComplete(qc); + sq->createComplete(qc); } } } @@ -120,10 +122,10 @@ SimpleQueue::asyncDestroy(const bool deleteQueue) m_destroyPending = true; if (m_store) { if (deleteQueue) { - boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), - s_nullTxnHandle, - &handleAsyncDestroyResult, - &m_resultQueue)); + boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), + s_nullTxnHandle, + &handleAsyncDestroyResult, + &m_resultQueue)); m_store->submitDestroy(m_queueHandle, qac); ++m_asyncOpCounter; } @@ -135,13 +137,15 @@ SimpleQueue::asyncDestroy(const bool deleteQueue) void SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh) { if (arh) { - boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> qc = + boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); if (arh->getErrNo()) { // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; } else { - qc->getQueue()->destroyComplete(qc); + sq->destroyComplete(qc); } } } @@ -356,18 +360,16 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, { assert(pqm.get()); // qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this - boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), - pqm->payload(), - th, - &handleAsyncEnqueueResult, - &m_resultQueue)); - // TODO : This must be done from inside store, not here + boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), + pqm->payload(), + th, + &handleAsyncEnqueueResult, + &m_resultQueue)); + // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) if (th.isValid()) { th.incrOpCnt(); } - m_store->submitEnqueue(pqm->enqHandle(), - th, - qac); + m_store->submitEnqueue(pqm->enqHandle(), th, qac); ++m_asyncOpCounter; return true; } @@ -376,13 +378,15 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, void SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh) { if (arh) { - boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> qc = + boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); if (arh->getErrNo()) { // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; } else { - qc->getQueue()->enqueueComplete(qc); + sq->enqueueComplete(qc); } } } @@ -393,12 +397,12 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, boost::shared_ptr<PersistableQueuedMessage> pqm) { assert(pqm.get()); - boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), - pqm->payload(), - th, - &handleAsyncDequeueResult, - &m_resultQueue)); - // TODO : This must be done from inside store, not here + boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), + pqm->payload(), + th, + &handleAsyncDequeueResult, + &m_resultQueue)); + // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) if (th.isValid()) { th.incrOpCnt(); } @@ -412,13 +416,15 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, void SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh) { if (arh) { - boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<qpid::broker::QueueAsyncContext> qc = + boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); if (arh->getErrNo()) { // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; } else { - qc->getQueue()->dequeueComplete(qc); + sq->dequeueComplete(qc); } } } @@ -436,7 +442,7 @@ SimpleQueue::destroyCheck(const std::string& opDescr) const // private void -SimpleQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc) +SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { assert(qc->getQueue().get() == this); --m_asyncOpCounter; @@ -444,7 +450,7 @@ SimpleQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc) // private void -SimpleQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc) +SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { assert(qc->getQueue().get() == this); --m_asyncOpCounter; @@ -452,7 +458,7 @@ SimpleQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc) // private void -SimpleQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc) +SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { assert(qc->getQueue().get() == this); --m_asyncOpCounter; @@ -461,12 +467,12 @@ SimpleQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc) // private void -SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) +SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { assert(qc->getQueue().get() == this); --m_asyncOpCounter; - // TODO : This must be done from inside store, not here + // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) qpid::broker::TxnHandle th = qc->getTxnHandle(); if (th.isValid()) { // transactional enqueue th.decrOpCnt(); @@ -475,12 +481,12 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) // private void -SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) +SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { assert(qc->getQueue().get() == this); --m_asyncOpCounter; - // TODO : This must be done from inside store, not here + // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) qpid::broker::TxnHandle th = qc->getTxnHandle(); if (th.isValid()) { // transactional enqueue th.decrOpCnt(); |