diff options
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp')
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp | 172 |
1 files changed, 72 insertions, 100 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index f297e83402..06b4e9333f 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -31,7 +31,7 @@ #include "qpid/broker/AsyncResultHandle.h" #include "qpid/broker/QueueAsyncContext.h" -#include "qpid/broker/TxnHandle.h" +#include "qpid/broker/TxnBuffer.h" #include <string.h> // memcpy() @@ -65,33 +65,27 @@ SimpleQueue::SimpleQueue(const std::string& name, } } -SimpleQueue::~SimpleQueue() -{} +SimpleQueue::~SimpleQueue() {} const qpid::broker::QueueHandle& -SimpleQueue::getHandle() const -{ +SimpleQueue::getHandle() const { return m_queueHandle; } qpid::broker::QueueHandle& -SimpleQueue::getHandle() -{ +SimpleQueue::getHandle() { return m_queueHandle; } qpid::broker::AsyncStore* -SimpleQueue::getStore() -{ +SimpleQueue::getStore() { return m_store; } void -SimpleQueue::asyncCreate() -{ +SimpleQueue::asyncCreate() { if (m_store) { boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - s_nullTxnHandle, &handleAsyncCreateResult, &m_resultQueue)); m_store->submitCreate(m_queueHandle, this, qac); @@ -123,7 +117,6 @@ SimpleQueue::asyncDestroy(const bool deleteQueue) if (m_store) { if (deleteQueue) { boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - s_nullTxnHandle, &handleAsyncDestroyResult, &m_resultQueue)); m_store->submitDestroy(m_queueHandle, qac); @@ -151,16 +144,14 @@ SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* con } void -SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) -{ +SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) { boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg))); - enqueue(s_nullTxnHandle, qm); + enqueue(qm); push(qm); } bool -SimpleQueue::dispatch(MessageConsumer& mc) -{ +SimpleQueue::dispatch(MessageConsumer& mc) { boost::shared_ptr<QueuedMessage> qm; if (m_messages->consume(qm)) { boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false)); @@ -171,110 +162,95 @@ SimpleQueue::dispatch(MessageConsumer& mc) } bool -SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) -{ - return enqueue(s_nullTxnHandle, qm); +SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) { + return enqueue(0, qm); } bool -SimpleQueue::enqueue(qpid::broker::TxnHandle& th, - boost::shared_ptr<QueuedMessage> qm) -{ +SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb, + boost::shared_ptr<QueuedMessage> qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; } if (qm->payload()->isPersistent() && m_store) { qm->payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(th, qm); + return asyncEnqueue(tb, qm); } return false; } bool -SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) -{ - return dequeue(s_nullTxnHandle, qm); +SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) { + return dequeue(0, qm); } bool -SimpleQueue::dequeue(qpid::broker::TxnHandle& th, - boost::shared_ptr<QueuedMessage> qm) -{ +SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb, + boost::shared_ptr<QueuedMessage> qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; } if (qm->payload()->isPersistent() && m_store) { qm->payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(th, qm); + return asyncDequeue(tb, qm); } return true; } void -SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) -{ +SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) { push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg))); } void -SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage> /*msg*/) -{} +SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage>) {} void -SimpleQueue::encode(qpid::framing::Buffer& buffer) const -{ +SimpleQueue::encode(qpid::framing::Buffer& buffer) const { buffer.putShortString(m_name); } uint32_t -SimpleQueue::encodedSize() const -{ +SimpleQueue::encodedSize() const { return m_name.size() + 1; } uint64_t -SimpleQueue::getPersistenceId() const -{ +SimpleQueue::getPersistenceId() const { return m_persistenceId; } void -SimpleQueue::setPersistenceId(uint64_t persistenceId) const -{ +SimpleQueue::setPersistenceId(uint64_t persistenceId) const { m_persistenceId = persistenceId; } void -SimpleQueue::flush() -{ +SimpleQueue::flush() { //if(m_store) m_store->flush(*this); } const std::string& -SimpleQueue::getName() const -{ +SimpleQueue::getName() const { return m_name; } void -SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) -{ +SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) { if (externalQueueStore != inst && externalQueueStore) delete externalQueueStore; externalQueueStore = inst; } uint64_t -SimpleQueue::getSize() -{ +SimpleQueue::getSize() { return m_persistableData.size(); } void -SimpleQueue::write(char* target) -{ +SimpleQueue::write(char* target) { ::memcpy(target, m_persistableData.data(), m_persistableData.size()); } @@ -344,21 +320,20 @@ SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm, // private bool -SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, - boost::shared_ptr<QueuedMessage> qm) -{ +SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb, + boost::shared_ptr<QueuedMessage> qm) { assert(qm.get()); -// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), qm->payload(), - th, + tb, &handleAsyncEnqueueResult, &m_resultQueue)); - // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) - if (th.isValid()) { - th.incrOpCnt(); + if (tb) { + tb->incrOpCnt(); + m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac); + } else { + m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac); } - m_store->submitEnqueue(qm->enqHandle(), th, qac); ++m_asyncOpCounter; return true; } @@ -382,22 +357,21 @@ SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* con // private bool -SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, +SimpleQueue::asyncDequeue(/*boost::shared_ptr<qpid::broker::TxnBuffer>*/qpid::broker::TxnBuffer* tb, boost::shared_ptr<QueuedMessage> qm) { assert(qm.get()); boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), qm->payload(), - th, + tb, &handleAsyncDequeueResult, &m_resultQueue)); - // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) - if (th.isValid()) { - th.incrOpCnt(); + if (tb) { + tb->incrOpCnt(); + m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac); + } else { + m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac); } - m_store->submitDequeue(qm->enqHandle(), - th, - qac); ++m_asyncOpCounter; return true; } @@ -420,8 +394,7 @@ SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* con // private void -SimpleQueue::destroyCheck(const std::string& opDescr) const -{ +SimpleQueue::destroyCheck(const std::string& opDescr) const { if (m_destroyPending || m_destroyed) { std::ostringstream oss; oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; @@ -431,55 +404,54 @@ SimpleQueue::destroyCheck(const std::string& opDescr) const // private void -SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) -{ - assert(qc->getQueue().get() == this); +SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } --m_asyncOpCounter; } // private void -SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) -{ - assert(qc->getQueue().get() == this); +SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } --m_asyncOpCounter; } // private void -SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) -{ - assert(qc->getQueue().get() == this); +SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } --m_asyncOpCounter; m_destroyed = true; } // private void -SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) -{ - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; - - // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) - qpid::broker::TxnHandle th = qc->getTxnHandle(); - if (th.isValid()) { // transactional enqueue - th.decrOpCnt(); +SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + if (qc->getTxnBuffer()) { // transactional enqueue + qc->getTxnBuffer()->decrOpCnt(); + } } + --m_asyncOpCounter; } // private void -SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) -{ - assert(qc->getQueue().get() == this); - --m_asyncOpCounter; - - // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store) - qpid::broker::TxnHandle th = qc->getTxnHandle(); - if (th.isValid()) { // transactional enqueue - th.decrOpCnt(); +SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + if (qc->getTxnBuffer()) { // transactional enqueue + qc->getTxnBuffer()->decrOpCnt(); + } } + --m_asyncOpCounter; } }}} // namespace tests::storePerftools::asyncPerf |