From f9f2c6aae6514d0e279b95aaaf90195326738b66 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Mon, 11 Jun 2012 15:10:11 +0000 Subject: WIP: Fixed some logic errors in the non-persistent pathway git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1348891 13f79535-47bb-0310-9956-ffa450edef68 --- .../tests/storePerftools/asyncPerf/MessageConsumer.cpp | 2 -- .../tests/storePerftools/asyncPerf/MessageProducer.cpp | 1 - .../storePerftools/asyncPerf/SimplePersistableQueue.cpp | 17 ++++++++--------- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 07baabf6d6..0a2fd4f333 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -46,7 +46,6 @@ MessageConsumer::~MessageConsumer() void* MessageConsumer::runConsumers() { -/* uint32_t numMsgs = 0; while (numMsgs < m_perfTestParams.m_numMsgs) { if (m_queue->dispatch()) { @@ -55,7 +54,6 @@ MessageConsumer::runConsumers() ::usleep(1000); // TODO - replace this poller with condition variable } } -*/ return 0; } diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index e75f773e23..5bb72da883 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -51,7 +51,6 @@ MessageProducer::~MessageProducer() void* MessageProducer::runProducers() { - boost::shared_ptr txn; for (uint32_t numMsgs=0; numMsgs msg(new SimplePersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); m_queue->deliver(msg); diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp index 198c43b087..3d973e2957 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp @@ -159,9 +159,8 @@ void SimplePersistableQueue::deliver(boost::shared_ptr msg) { QueuedMessage qm(this, msg); - if(enqueue((SimpleTransactionContext*)0, qm)) { - push(qm); - } + enqueue((SimpleTransactionContext*)0, qm); + push(qm); } bool @@ -176,7 +175,7 @@ SimplePersistableQueue::dispatch() bool SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt, - QueuedMessage& qm) + QueuedMessage& qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { @@ -191,7 +190,7 @@ SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt, bool SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt, - QueuedMessage& qm) + QueuedMessage& qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { @@ -201,7 +200,7 @@ SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt, qm.payload()->dequeueAsync(shared_from_this(), m_store); return asyncDequeue(ctxt, qm); } - return false; + return true; } void @@ -317,7 +316,7 @@ SimplePersistableQueue::ScopedUse::~ScopedUse() // private void SimplePersistableQueue::push(QueuedMessage& qm, - bool /*isRecovery*/) + bool /*isRecovery*/) { QueuedMessage removed; m_messages->push(qm, removed); @@ -328,7 +327,7 @@ SimplePersistableQueue::push(QueuedMessage& qm, // private bool SimplePersistableQueue::asyncEnqueue(SimpleTransactionContext* txn, - QueuedMessage& qm) + QueuedMessage& qm) { qm.payload()->setPersistenceId(m_store->getNextRid()); //std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; @@ -347,7 +346,7 @@ SimplePersistableQueue::asyncEnqueue(SimpleTransactionContext* txn, // private bool SimplePersistableQueue::asyncDequeue(SimpleTransactionContext* txn, - QueuedMessage& qm) + QueuedMessage& qm) { //std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), -- cgit v1.2.1