summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-06-11 15:10:11 +0000
committerKim van der Riet <kpvdr@apache.org>2012-06-11 15:10:11 +0000
commitf9f2c6aae6514d0e279b95aaaf90195326738b66 (patch)
tree66a364ad735bac35c21eb3cee612c728690eae75
parent356337b9651760059c22f1937f04406a43383452 (diff)
downloadqpid-python-f9f2c6aae6514d0e279b95aaaf90195326738b66.tar.gz
WIP: Fixed some logic errors in the non-persistent pathway
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1348891 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp1
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp17
3 files changed, 8 insertions, 12 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 07baabf6d6..0a2fd4f333 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -46,7 +46,6 @@ MessageConsumer::~MessageConsumer()
void*
MessageConsumer::runConsumers()
{
-/*
uint32_t numMsgs = 0;
while (numMsgs < m_perfTestParams.m_numMsgs) {
if (m_queue->dispatch()) {
@@ -55,7 +54,6 @@ MessageConsumer::runConsumers()
::usleep(1000); // TODO - replace this poller with condition variable
}
}
-*/
return 0;
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
index e75f773e23..5bb72da883 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
@@ -51,7 +51,6 @@ MessageProducer::~MessageProducer()
void*
MessageProducer::runProducers()
{
- boost::shared_ptr<SimpleTransactionContext> txn;
for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) {
boost::shared_ptr<SimplePersistableMessage> msg(new SimplePersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store));
m_queue->deliver(msg);
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
index 198c43b087..3d973e2957 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
@@ -159,9 +159,8 @@ void
SimplePersistableQueue::deliver(boost::shared_ptr<SimplePersistableMessage> msg)
{
QueuedMessage qm(this, msg);
- if(enqueue((SimpleTransactionContext*)0, qm)) {
- push(qm);
- }
+ enqueue((SimpleTransactionContext*)0, qm);
+ push(qm);
}
bool
@@ -176,7 +175,7 @@ SimplePersistableQueue::dispatch()
bool
SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt,
- QueuedMessage& qm)
+ QueuedMessage& qm)
{
ScopedUse u(m_barrier);
if (!u.m_acquired) {
@@ -191,7 +190,7 @@ SimplePersistableQueue::enqueue(SimpleTransactionContext* ctxt,
bool
SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt,
- QueuedMessage& qm)
+ QueuedMessage& qm)
{
ScopedUse u(m_barrier);
if (!u.m_acquired) {
@@ -201,7 +200,7 @@ SimplePersistableQueue::dequeue(SimpleTransactionContext* ctxt,
qm.payload()->dequeueAsync(shared_from_this(), m_store);
return asyncDequeue(ctxt, qm);
}
- return false;
+ return true;
}
void
@@ -317,7 +316,7 @@ SimplePersistableQueue::ScopedUse::~ScopedUse()
// private
void
SimplePersistableQueue::push(QueuedMessage& qm,
- bool /*isRecovery*/)
+ bool /*isRecovery*/)
{
QueuedMessage removed;
m_messages->push(qm, removed);
@@ -328,7 +327,7 @@ SimplePersistableQueue::push(QueuedMessage& qm,
// private
bool
SimplePersistableQueue::asyncEnqueue(SimpleTransactionContext* txn,
- QueuedMessage& qm)
+ QueuedMessage& qm)
{
qm.payload()->setPersistenceId(m_store->getNextRid());
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
@@ -347,7 +346,7 @@ SimplePersistableQueue::asyncEnqueue(SimpleTransactionContext* txn,
// private
bool
SimplePersistableQueue::asyncDequeue(SimpleTransactionContext* txn,
- QueuedMessage& qm)
+ QueuedMessage& qm)
{
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),