summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-07-20 13:50:48 +0000
committerKim van der Riet <kpvdr@apache.org>2012-07-20 13:50:48 +0000
commitdd79efca55e3ff0a0e0c25e395967ce9b3f80482 (patch)
treee10963e1718a58e3ee4cfaf41cd08bc6856d3b63 /cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
parent2e437e1569009d8e8ed3ed896d751994e2e85d74 (diff)
downloadqpid-python-dd79efca55e3ff0a0e0c25e395967ce9b3f80482.tar.gz
QPID-3858: WIP: Moved QueueAsycnContext from namespace tests::storePerftools::asyncPerf to qpid::broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1363776 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp92
1 files changed, 49 insertions, 43 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index 79b8b46919..292fc35925 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -27,10 +27,10 @@
#include "MessageConsumer.h"
#include "MessageDeque.h"
#include "PersistableQueuedMessage.h"
-#include "QueueAsyncContext.h"
#include "SimpleMessage.h"
#include "qpid/broker/AsyncResultHandle.h"
+#include "qpid/broker/QueueAsyncContext.h"
#include "qpid/broker/TxnHandle.h"
#include <string.h> // memcpy()
@@ -90,10 +90,10 @@ void
SimpleQueue::asyncCreate()
{
if (m_store) {
- boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
- s_nullTxnHandle,
- &handleAsyncCreateResult,
- &m_resultQueue));
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
+ s_nullTxnHandle,
+ &handleAsyncCreateResult,
+ &m_resultQueue));
m_store->submitCreate(m_queueHandle, this, qac);
++m_asyncOpCounter;
}
@@ -103,13 +103,15 @@ SimpleQueue::asyncCreate()
void
SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh) {
if (arh) {
- boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
+ boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
if (arh->getErrNo()) {
// TODO: Handle async failure here (other than by simply printing a message)
- std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure "
<< arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
} else {
- qc->getQueue()->createComplete(qc);
+ sq->createComplete(qc);
}
}
}
@@ -120,10 +122,10 @@ SimpleQueue::asyncDestroy(const bool deleteQueue)
m_destroyPending = true;
if (m_store) {
if (deleteQueue) {
- boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
- s_nullTxnHandle,
- &handleAsyncDestroyResult,
- &m_resultQueue));
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
+ s_nullTxnHandle,
+ &handleAsyncDestroyResult,
+ &m_resultQueue));
m_store->submitDestroy(m_queueHandle, qac);
++m_asyncOpCounter;
}
@@ -135,13 +137,15 @@ SimpleQueue::asyncDestroy(const bool deleteQueue)
void
SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh) {
if (arh) {
- boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
+ boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
if (arh->getErrNo()) {
// TODO: Handle async failure here (other than by simply printing a message)
- std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure "
<< arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
} else {
- qc->getQueue()->destroyComplete(qc);
+ sq->destroyComplete(qc);
}
}
}
@@ -356,18 +360,16 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
{
assert(pqm.get());
// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this
- boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
- pqm->payload(),
- th,
- &handleAsyncEnqueueResult,
- &m_resultQueue));
- // TODO : This must be done from inside store, not here
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
+ pqm->payload(),
+ th,
+ &handleAsyncEnqueueResult,
+ &m_resultQueue));
+ // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
if (th.isValid()) {
th.incrOpCnt();
}
- m_store->submitEnqueue(pqm->enqHandle(),
- th,
- qac);
+ m_store->submitEnqueue(pqm->enqHandle(), th, qac);
++m_asyncOpCounter;
return true;
}
@@ -376,13 +378,15 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
void
SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh) {
if (arh) {
- boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
+ boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
if (arh->getErrNo()) {
// TODO: Handle async failure here (other than by simply printing a message)
- std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure "
<< arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
} else {
- qc->getQueue()->enqueueComplete(qc);
+ sq->enqueueComplete(qc);
}
}
}
@@ -393,12 +397,12 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
boost::shared_ptr<PersistableQueuedMessage> pqm)
{
assert(pqm.get());
- boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
- pqm->payload(),
- th,
- &handleAsyncDequeueResult,
- &m_resultQueue));
- // TODO : This must be done from inside store, not here
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
+ pqm->payload(),
+ th,
+ &handleAsyncDequeueResult,
+ &m_resultQueue));
+ // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
if (th.isValid()) {
th.incrOpCnt();
}
@@ -412,13 +416,15 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
void
SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh) {
if (arh) {
- boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
+ boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
if (arh->getErrNo()) {
// TODO: Handle async failure here (other than by simply printing a message)
- std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure "
<< arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
} else {
- qc->getQueue()->dequeueComplete(qc);
+ sq->dequeueComplete(qc);
}
}
}
@@ -436,7 +442,7 @@ SimpleQueue::destroyCheck(const std::string& opDescr) const
// private
void
-SimpleQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
{
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
@@ -444,7 +450,7 @@ SimpleQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc)
// private
void
-SimpleQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
{
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
@@ -452,7 +458,7 @@ SimpleQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc)
// private
void
-SimpleQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
{
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
@@ -461,12 +467,12 @@ SimpleQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc)
// private
void
-SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
{
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
- // TODO : This must be done from inside store, not here
+ // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
qpid::broker::TxnHandle th = qc->getTxnHandle();
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
@@ -475,12 +481,12 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
// private
void
-SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
{
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
- // TODO : This must be done from inside store, not here
+ // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
qpid::broker::TxnHandle th = qc->getTxnHandle();
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();