diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-07-16 13:54:11 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-07-16 13:54:11 +0000 |
commit | a804510d81ade0594a75b5c9b8765cafcc233245 (patch) | |
tree | 8c6be643564b6d8c88619d17de7150c98a314781 /cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp | |
parent | 1ab07197127e990da2c765ea0ffa5fd8ca47b7b6 (diff) | |
download | qpid-python-a804510d81ade0594a75b5c9b8765cafcc233245.tar.gz |
QPID-3858: Refactor to tidy up several class design issues
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1362039 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp')
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp | 86 |
1 files changed, 58 insertions, 28 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index 8bb79367ed..3bce2fb52a 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -26,13 +26,15 @@ #include "DeliveryRecord.h" #include "MessageConsumer.h" #include "MessageDeque.h" -#include "SimpleMessage.h" +#include "PersistableQueuedMessage.h" #include "QueueAsyncContext.h" #include "QueuedMessage.h" +#include "SimpleMessage.h" #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/broker/AsyncResultHandle.h" -#include "qpid/broker/TxnHandle.h" + +#include <boost/make_shared.hpp> namespace tests { namespace storePerftools { @@ -44,7 +46,7 @@ qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operat SimpleQueue::SimpleQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq) : qpid::broker::PersistableQueue(), m_name(name), @@ -117,7 +119,7 @@ SimpleQueue::getHandle() return m_queueHandle; } -qpid::asyncStore::AsyncStoreImpl* +qpid::broker::AsyncStore* SimpleQueue::getStore() { return m_store; @@ -161,15 +163,24 @@ SimpleQueue::asyncDestroy(const bool deleteQueue) void SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) { - QueuedMessage qm(this, msg); + boost::shared_ptr<QueuedMessage> qm; + if (msg->isPersistent() && m_store) { + qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); + } else { + qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg)); + } +//boost::shared_ptr<PersistableQueuedMessage> pqm1 = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); +//assert(pqm1.get()); enqueue(s_nullTxnHandle, qm); +//boost::shared_ptr<PersistableQueuedMessage> pqm2 = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); +//assert(pqm2.get()); push(qm); } bool SimpleQueue::dispatch(MessageConsumer& mc) { - QueuedMessage qm; + boost::shared_ptr<QueuedMessage> qm; if (m_messages->consume(qm)) { boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false)); mc.record(dr); @@ -179,43 +190,47 @@ SimpleQueue::dispatch(MessageConsumer& mc) } bool -SimpleQueue::enqueue(QueuedMessage& qm) +SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) { return enqueue(s_nullTxnHandle, qm); } bool SimpleQueue::enqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) + boost::shared_ptr<QueuedMessage> qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; } - if (qm.payload()->isPersistent() && m_store) { - qm.payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(th, qm); + if (qm->payload()->isPersistent() && m_store) { + qm->payload()->enqueueAsync(shared_from_this(), m_store); + return asyncEnqueue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm)); } return false; } bool -SimpleQueue::dequeue(QueuedMessage& qm) +SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) { return dequeue(s_nullTxnHandle, qm); } bool SimpleQueue::dequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) + boost::shared_ptr<QueuedMessage> qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; } - if (qm.payload()->isPersistent() && m_store) { - qm.payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(th, qm); + if (qm->payload()->isPersistent() && m_store) { + qm->payload()->dequeueAsync(shared_from_this(), m_store); +//assert(qm.get()); +//boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); +//assert(pqm.get()); +//return asyncDequeue(th, pqm); + return asyncDequeue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm)); } return true; } @@ -223,7 +238,12 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th, void SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) { - QueuedMessage qm(this, msg); + boost::shared_ptr<QueuedMessage> qm; + if (msg->isPersistent() && m_store) { + qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); + } else { + qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg)); + } push(qm); } @@ -343,11 +363,13 @@ SimpleQueue::ScopedUse::~ScopedUse() // private void -SimpleQueue::push(QueuedMessage& qm, +SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm, bool /*isRecovery*/) { - QueuedMessage removed; - m_messages->push(qm, removed); +boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); +assert(pqm.get()); + + m_messages->push(qm); } // --- End Members & methods in msg handling path from qpid::Queue --- @@ -355,20 +377,22 @@ SimpleQueue::push(QueuedMessage& qm, // private bool SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) + boost::shared_ptr<PersistableQueuedMessage> pqm) { - qm.payload()->setPersistenceId(m_store->getNextRid()); -//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; + assert(pqm.get()); +// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this +//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << pqm->payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), - qm.payload(), + pqm->payload(), th, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, &handleAsyncResult, &m_resultQueue)); + // TODO : This must be done from inside store, not here if (th.isValid()) { th.incrOpCnt(); } - m_store->submitEnqueue(qm.enqHandle(), + m_store->submitEnqueue(pqm->enqHandle(), th, qac); ++m_asyncOpCounter; @@ -378,19 +402,21 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, // private bool SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) + boost::shared_ptr<PersistableQueuedMessage> pqm) { + assert(pqm.get()); //std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), - qm.payload(), + pqm->payload(), th, qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, &handleAsyncResult, &m_resultQueue)); + // TODO : This must be done from inside store, not here if (th.isValid()) { th.incrOpCnt(); } - m_store->submitDequeue(qm.enqHandle(), + m_store->submitDequeue(pqm->enqHandle(), th, qac); ++m_asyncOpCounter; @@ -445,6 +471,8 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) --m_asyncOpCounter; qpid::broker::TxnHandle th = qc->getTxnHandle(); + + // TODO : This must be done from inside store, not here if (th.isValid()) { // transactional enqueue th.decrOpCnt(); } @@ -459,6 +487,8 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) --m_asyncOpCounter; qpid::broker::TxnHandle th = qc->getTxnHandle(); + + // TODO : This must be done from inside store, not here if (th.isValid()) { // transactional enqueue th.decrOpCnt(); } |