summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp133
1 files changed, 71 insertions, 62 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index 2a6f2b208b..79b8b46919 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -33,7 +33,6 @@
#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/TxnHandle.h"
-#include <boost/make_shared.hpp>
#include <string.h> // memcpy()
namespace tests {
@@ -69,43 +68,6 @@ SimpleQueue::SimpleQueue(const std::string& name,
SimpleQueue::~SimpleQueue()
{}
-// static
-void
-SimpleQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh)
-{
- if (arh) {
- boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
- if (arh->getErrNo()) {
- // TODO: Handle async failure here (other than by simply printing a message)
- std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
- << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
- } else {
- // Handle async success here
- switch(qc->getOpCode()) {
- case qpid::asyncStore::AsyncOperation::QUEUE_CREATE:
- qc->getQueue()->createComplete(qc);
- break;
- case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH:
- qc->getQueue()->flushComplete(qc);
- break;
- case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY:
- qc->getQueue()->destroyComplete(qc);
- break;
- case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE:
- qc->getQueue()->enqueueComplete(qc);
- break;
- case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE:
- qc->getQueue()->dequeueComplete(qc);
- break;
- default:
- std::ostringstream oss;
- oss << "tests::storePerftools::asyncPerf::SimpleQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode();
- throw qpid::Exception(oss.str());
- };
- }
- }
-}
-
const qpid::broker::QueueHandle&
SimpleQueue::getHandle() const
{
@@ -130,16 +92,28 @@ SimpleQueue::asyncCreate()
if (m_store) {
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
s_nullTxnHandle,
- qpid::asyncStore::AsyncOperation::QUEUE_CREATE,
- &handleAsyncResult,
+ &handleAsyncCreateResult,
&m_resultQueue));
- m_store->submitCreate(m_queueHandle,
- this,
- qac);
+ m_store->submitCreate(m_queueHandle, this, qac);
++m_asyncOpCounter;
}
}
+//static
+void
+SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ qc->getQueue()->createComplete(qc);
+ }
+ }
+}
+
void
SimpleQueue::asyncDestroy(const bool deleteQueue)
{
@@ -148,25 +122,38 @@ SimpleQueue::asyncDestroy(const bool deleteQueue)
if (deleteQueue) {
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
s_nullTxnHandle,
- qpid::asyncStore::AsyncOperation::QUEUE_DESTROY,
- &handleAsyncResult,
+ &handleAsyncDestroyResult,
&m_resultQueue));
- m_store->submitDestroy(m_queueHandle,
- qac);
+ m_store->submitDestroy(m_queueHandle, qac);
++m_asyncOpCounter;
}
m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000));
}
}
+//static
+void
+SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ qc->getQueue()->destroyComplete(qc);
+ }
+ }
+}
+
void
SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
{
boost::shared_ptr<QueuedMessage> qm;
if (msg->isPersistent() && m_store) {
- qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
+ qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
} else {
- qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg));
+ qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
}
enqueue(s_nullTxnHandle, qm);
push(qm);
@@ -231,9 +218,9 @@ SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
{
boost::shared_ptr<QueuedMessage> qm;
if (msg->isPersistent() && m_store) {
- qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
+ qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
} else {
- qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg));
+ qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
}
push(qm);
}
@@ -357,9 +344,6 @@ void
SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
bool /*isRecovery*/)
{
-boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
-assert(pqm.get());
-
m_messages->push(qm);
}
@@ -375,8 +359,7 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
pqm->payload(),
th,
- qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
- &handleAsyncResult,
+ &handleAsyncEnqueueResult,
&m_resultQueue));
// TODO : This must be done from inside store, not here
if (th.isValid()) {
@@ -389,6 +372,21 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
return true;
}
+// private static
+void
+SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ qc->getQueue()->enqueueComplete(qc);
+ }
+ }
+}
+
// private
bool
SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
@@ -398,8 +396,7 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
pqm->payload(),
th,
- qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
- &handleAsyncResult,
+ &handleAsyncDequeueResult,
&m_resultQueue));
// TODO : This must be done from inside store, not here
if (th.isValid()) {
@@ -411,6 +408,20 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
++m_asyncOpCounter;
return true;
}
+// private static
+void
+SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ qc->getQueue()->dequeueComplete(qc);
+ }
+ }
+}
// private
void
@@ -455,9 +466,8 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
- qpid::broker::TxnHandle th = qc->getTxnHandle();
-
// TODO : This must be done from inside store, not here
+ qpid::broker::TxnHandle th = qc->getTxnHandle();
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
}
@@ -470,9 +480,8 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
- qpid::broker::TxnHandle th = qc->getTxnHandle();
-
// TODO : This must be done from inside store, not here
+ qpid::broker::TxnHandle th = qc->getTxnHandle();
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
}