summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-07-16 13:54:11 +0000
committerKim van der Riet <kpvdr@apache.org>2012-07-16 13:54:11 +0000
commita804510d81ade0594a75b5c9b8765cafcc233245 (patch)
tree8c6be643564b6d8c88619d17de7150c98a314781 /cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
parent1ab07197127e990da2c765ea0ffa5fd8ca47b7b6 (diff)
downloadqpid-python-a804510d81ade0594a75b5c9b8765cafcc233245.tar.gz
QPID-3858: Refactor to tidy up several class design issues
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1362039 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp86
1 files changed, 58 insertions, 28 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index 8bb79367ed..3bce2fb52a 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -26,13 +26,15 @@
#include "DeliveryRecord.h"
#include "MessageConsumer.h"
#include "MessageDeque.h"
-#include "SimpleMessage.h"
+#include "PersistableQueuedMessage.h"
#include "QueueAsyncContext.h"
#include "QueuedMessage.h"
+#include "SimpleMessage.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
#include "qpid/broker/AsyncResultHandle.h"
-#include "qpid/broker/TxnHandle.h"
+
+#include <boost/make_shared.hpp>
namespace tests {
namespace storePerftools {
@@ -44,7 +46,7 @@ qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operat
SimpleQueue::SimpleQueue(const std::string& name,
const qpid::framing::FieldTable& /*args*/,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq) :
qpid::broker::PersistableQueue(),
m_name(name),
@@ -117,7 +119,7 @@ SimpleQueue::getHandle()
return m_queueHandle;
}
-qpid::asyncStore::AsyncStoreImpl*
+qpid::broker::AsyncStore*
SimpleQueue::getStore()
{
return m_store;
@@ -161,15 +163,24 @@ SimpleQueue::asyncDestroy(const bool deleteQueue)
void
SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
{
- QueuedMessage qm(this, msg);
+ boost::shared_ptr<QueuedMessage> qm;
+ if (msg->isPersistent() && m_store) {
+ qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
+ } else {
+ qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg));
+ }
+//boost::shared_ptr<PersistableQueuedMessage> pqm1 = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
+//assert(pqm1.get());
enqueue(s_nullTxnHandle, qm);
+//boost::shared_ptr<PersistableQueuedMessage> pqm2 = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
+//assert(pqm2.get());
push(qm);
}
bool
SimpleQueue::dispatch(MessageConsumer& mc)
{
- QueuedMessage qm;
+ boost::shared_ptr<QueuedMessage> qm;
if (m_messages->consume(qm)) {
boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false));
mc.record(dr);
@@ -179,43 +190,47 @@ SimpleQueue::dispatch(MessageConsumer& mc)
}
bool
-SimpleQueue::enqueue(QueuedMessage& qm)
+SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm)
{
return enqueue(s_nullTxnHandle, qm);
}
bool
SimpleQueue::enqueue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+ boost::shared_ptr<QueuedMessage> qm)
{
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
}
- if (qm.payload()->isPersistent() && m_store) {
- qm.payload()->enqueueAsync(shared_from_this(), m_store);
- return asyncEnqueue(th, qm);
+ if (qm->payload()->isPersistent() && m_store) {
+ qm->payload()->enqueueAsync(shared_from_this(), m_store);
+ return asyncEnqueue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
}
return false;
}
bool
-SimpleQueue::dequeue(QueuedMessage& qm)
+SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm)
{
return dequeue(s_nullTxnHandle, qm);
}
bool
SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+ boost::shared_ptr<QueuedMessage> qm)
{
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
}
- if (qm.payload()->isPersistent() && m_store) {
- qm.payload()->dequeueAsync(shared_from_this(), m_store);
- return asyncDequeue(th, qm);
+ if (qm->payload()->isPersistent() && m_store) {
+ qm->payload()->dequeueAsync(shared_from_this(), m_store);
+//assert(qm.get());
+//boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
+//assert(pqm.get());
+//return asyncDequeue(th, pqm);
+ return asyncDequeue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm));
}
return true;
}
@@ -223,7 +238,12 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
void
SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
{
- QueuedMessage qm(this, msg);
+ boost::shared_ptr<QueuedMessage> qm;
+ if (msg->isPersistent() && m_store) {
+ qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
+ } else {
+ qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg));
+ }
push(qm);
}
@@ -343,11 +363,13 @@ SimpleQueue::ScopedUse::~ScopedUse()
// private
void
-SimpleQueue::push(QueuedMessage& qm,
+SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
bool /*isRecovery*/)
{
- QueuedMessage removed;
- m_messages->push(qm, removed);
+boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
+assert(pqm.get());
+
+ m_messages->push(qm);
}
// --- End Members & methods in msg handling path from qpid::Queue ---
@@ -355,20 +377,22 @@ SimpleQueue::push(QueuedMessage& qm,
// private
bool
SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+ boost::shared_ptr<PersistableQueuedMessage> pqm)
{
- qm.payload()->setPersistenceId(m_store->getNextRid());
-//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
+ assert(pqm.get());
+// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this
+//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << pqm->payload()->getPersistenceId() << std::dec << std::endl << std::flush;
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
- qm.payload(),
+ pqm->payload(),
th,
qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
&handleAsyncResult,
&m_resultQueue));
+ // TODO : This must be done from inside store, not here
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitEnqueue(qm.enqHandle(),
+ m_store->submitEnqueue(pqm->enqHandle(),
th,
qac);
++m_asyncOpCounter;
@@ -378,19 +402,21 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
// private
bool
SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+ boost::shared_ptr<PersistableQueuedMessage> pqm)
{
+ assert(pqm.get());
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
- qm.payload(),
+ pqm->payload(),
th,
qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
&handleAsyncResult,
&m_resultQueue));
+ // TODO : This must be done from inside store, not here
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitDequeue(qm.enqHandle(),
+ m_store->submitDequeue(pqm->enqHandle(),
th,
qac);
++m_asyncOpCounter;
@@ -445,6 +471,8 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
--m_asyncOpCounter;
qpid::broker::TxnHandle th = qc->getTxnHandle();
+
+ // TODO : This must be done from inside store, not here
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
}
@@ -459,6 +487,8 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
--m_asyncOpCounter;
qpid::broker::TxnHandle th = qc->getTxnHandle();
+
+ // TODO : This must be done from inside store, not here
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
}