diff options
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp')
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp | 37 |
1 files changed, 13 insertions, 24 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index 292fc35925..f297e83402 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -26,7 +26,7 @@ #include "DeliveryRecord.h" #include "MessageConsumer.h" #include "MessageDeque.h" -#include "PersistableQueuedMessage.h" +#include "QueuedMessage.h" #include "SimpleMessage.h" #include "qpid/broker/AsyncResultHandle.h" @@ -153,12 +153,7 @@ SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* con void SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) { - boost::shared_ptr<QueuedMessage> qm; - if (msg->isPersistent() && m_store) { - qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); - } else { - qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)); - } + boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg))); enqueue(s_nullTxnHandle, qm); push(qm); } @@ -191,7 +186,7 @@ SimpleQueue::enqueue(qpid::broker::TxnHandle& th, } if (qm->payload()->isPersistent() && m_store) { qm->payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm)); + return asyncEnqueue(th, qm); } return false; } @@ -212,7 +207,7 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th, } if (qm->payload()->isPersistent() && m_store) { qm->payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm)); + return asyncDequeue(th, qm); } return true; } @@ -220,13 +215,7 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th, void SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) { - boost::shared_ptr<QueuedMessage> qm; - if (msg->isPersistent() && m_store) { - qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); - } else { - qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)); - } - push(qm); + push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg))); } void @@ -356,12 +345,12 @@ SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm, // private bool SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, - boost::shared_ptr<PersistableQueuedMessage> pqm) + boost::shared_ptr<QueuedMessage> qm) { - assert(pqm.get()); + assert(qm.get()); // qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - pqm->payload(), + qm->payload(), th, &handleAsyncEnqueueResult, &m_resultQueue)); @@ -369,7 +358,7 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, if (th.isValid()) { th.incrOpCnt(); } - m_store->submitEnqueue(pqm->enqHandle(), th, qac); + m_store->submitEnqueue(qm->enqHandle(), th, qac); ++m_asyncOpCounter; return true; } @@ -394,11 +383,11 @@ SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* con // private bool SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, - boost::shared_ptr<PersistableQueuedMessage> pqm) + boost::shared_ptr<QueuedMessage> qm) { - assert(pqm.get()); + assert(qm.get()); boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - pqm->payload(), + qm->payload(), th, &handleAsyncDequeueResult, &m_resultQueue)); @@ -406,7 +395,7 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, if (th.isValid()) { th.incrOpCnt(); } - m_store->submitDequeue(pqm->enqHandle(), + m_store->submitDequeue(qm->enqHandle(), th, qac); ++m_asyncOpCounter; |