diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-06-11 15:10:11 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-06-11 15:10:11 +0000 |
commit | f9f2c6aae6514d0e279b95aaaf90195326738b66 (patch) | |
tree | 66a364ad735bac35c21eb3cee612c728690eae75 | |
parent | 356337b9651760059c22f1937f04406a43383452 (diff) | |
download | qpid-python-f9f2c6aae6514d0e279b95aaaf90195326738b66.tar.gz |
WIP: Fixed some logic errors in the non-persistent pathway
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1348891 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 8 insertions, 12 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 07baabf6d6..0a2fd4f333 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -46,7 +46,6 @@ MessageConsumer::~MessageConsumer() void* MessageConsumer::runConsumers() { -/* uint32_t numMsgs = 0; while (numMsgs < m_perfTestParams.m_numMsgs) { if (m_queue->dispatch()) { @@ -55,7 +54,6 @@ MessageConsumer::runConsumers() ::usleep(1000); // TODO - replace this poller with condition variable } } -*/ return 0; } diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index e75f773e23..5bb72da883 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -51,7 +51,6 @@ MessageProducer::~MessageProducer() void* MessageProducer::runProducers() { - boost::shared_ptr<SimpleTransactionContext> txn; for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) { boost::shared_ptr<SimplePersistableMessage> msg(new SimplePersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); m_queue->deliver(msg); diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp index 198c43b087..3d973e2957 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp @@ -159,9 +159,8 @@ void SimplePersistableQueue::deliver(boost::shared_ptr<SimplePersistableMessage> msg) { QueuedMessage qm(this, msg); - if(enqueue((SimpleTransactionContext*)0, qm)) { - push(qm); - } + enqueue((SimpleTransactionContext*)0, qm); + push(qm); } bool @@ -176,7 +175,7 @@ SimplePersistableQueue::dispatch() bool SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt, - QueuedMessage& qm) + QueuedMessage& qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { @@ -191,7 +190,7 @@ SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt, bool SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt, - QueuedMessage& qm) + QueuedMessage& qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { @@ -201,7 +200,7 @@ SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt, qm.payload()->dequeueAsync(shared_from_this(), m_store); return asyncDequeue(ctxt, qm); } - return false; + return true; } void @@ -317,7 +316,7 @@ SimplePersistableQueue::ScopedUse::~ScopedUse() // private void SimplePersistableQueue::push(QueuedMessage& qm, - bool /*isRecovery*/) + bool /*isRecovery*/) { QueuedMessage removed; m_messages->push(qm, removed); @@ -328,7 +327,7 @@ SimplePersistableQueue::push(QueuedMessage& qm, // private bool SimplePersistableQueue::asyncEnqueue(SimpleTransactionContext* txn, - QueuedMessage& qm) + QueuedMessage& qm) { qm.payload()->setPersistenceId(m_store->getNextRid()); //std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; @@ -347,7 +346,7 @@ SimplePersistableQueue::asyncEnqueue(SimpleTransactionContext* txn, // private bool SimplePersistableQueue::asyncDequeue(SimpleTransactionContext* txn, - QueuedMessage& qm) + QueuedMessage& qm) { //std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), |