summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp37
1 files changed, 13 insertions, 24 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index 292fc35925..f297e83402 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -26,7 +26,7 @@
#include "DeliveryRecord.h"
#include "MessageConsumer.h"
#include "MessageDeque.h"
-#include "PersistableQueuedMessage.h"
+#include "QueuedMessage.h"
#include "SimpleMessage.h"
#include "qpid/broker/AsyncResultHandle.h"
@@ -153,12 +153,7 @@ SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* con
void
SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
{
- boost::shared_ptr<QueuedMessage> qm;
- if (msg->isPersistent() && m_store) {
- qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
- } else {
- qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
- }
+ boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
enqueue(s_nullTxnHandle, qm);
push(qm);
}
@@ -191,7 +186,7 @@ SimpleQueue::enqueue(qpid::broker::TxnHandle& th,
}
if (qm->payload()->isPersistent() && m_store) {
qm->payload()->enqueueAsync(shared_from_this(), m_store);
- return asyncEnqueue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
+ return asyncEnqueue(th, qm);
}
return false;
}
@@ -212,7 +207,7 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
}
if (qm->payload()->isPersistent() && m_store) {
qm->payload()->dequeueAsync(shared_from_this(), m_store);
- return asyncDequeue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
+ return asyncDequeue(th, qm);
}
return true;
}
@@ -220,13 +215,7 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
void
SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
{
- boost::shared_ptr<QueuedMessage> qm;
- if (msg->isPersistent() && m_store) {
- qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
- } else {
- qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
- }
- push(qm);
+ push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
}
void
@@ -356,12 +345,12 @@ SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
// private
bool
SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<PersistableQueuedMessage> pqm)
+ boost::shared_ptr<QueuedMessage> qm)
{
- assert(pqm.get());
+ assert(qm.get());
// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- pqm->payload(),
+ qm->payload(),
th,
&handleAsyncEnqueueResult,
&m_resultQueue));
@@ -369,7 +358,7 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitEnqueue(pqm->enqHandle(), th, qac);
+ m_store->submitEnqueue(qm->enqHandle(), th, qac);
++m_asyncOpCounter;
return true;
}
@@ -394,11 +383,11 @@ SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* con
// private
bool
SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
- boost::shared_ptr<PersistableQueuedMessage> pqm)
+ boost::shared_ptr<QueuedMessage> qm)
{
- assert(pqm.get());
+ assert(qm.get());
boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- pqm->payload(),
+ qm->payload(),
th,
&handleAsyncDequeueResult,
&m_resultQueue));
@@ -406,7 +395,7 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitDequeue(pqm->enqHandle(),
+ m_store->submitDequeue(qm->enqHandle(),
th,
qac);
++m_asyncOpCounter;