summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-07-20 12:55:20 +0000
committerKim van der Riet <kpvdr@apache.org>2012-07-20 12:55:20 +0000
commit2e437e1569009d8e8ed3ed896d751994e2e85d74 (patch)
tree28459e48638bfcd3a7e565c37165b3fab714d484 /cpp/src/tests
parentc94c9b5333c06c03deb6a6dcb1a91ecdf111b481 (diff)
downloadqpid-python-2e437e1569009d8e8ed3ed896d751994e2e85d74.tar.gz
QPID-3858: WIP: Created many async operation classes for each op instead of a single class with op codes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1363759 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp14
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp16
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h7
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp133
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h16
6 files changed, 79 insertions, 111 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
index 5bcf3fe401..e3bfe9ae7a 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
@@ -32,10 +32,8 @@ namespace storePerftools {
namespace asyncPerf {
MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
- const qpid::asyncStore::AsyncOperation::opCode op,
boost::shared_ptr<SimpleQueue> q) :
m_msg(msg),
- m_op(op),
m_q(q)
{
assert(m_msg.get() != 0);
@@ -45,18 +43,6 @@ MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg
MessageAsyncContext::~MessageAsyncContext()
{}
-qpid::asyncStore::AsyncOperation::opCode
-MessageAsyncContext::getOpCode() const
-{
- return m_op;
-}
-
-const char*
-MessageAsyncContext::getOpStr() const
-{
- return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
-}
-
boost::intrusive_ptr<SimpleMessage>
MessageAsyncContext::getMessage() const
{
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
index 77d7be286b..9252fbda45 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
@@ -40,18 +40,14 @@ class MessageAsyncContext : public qpid::broker::BrokerAsyncContext
{
public:
MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
- const qpid::asyncStore::AsyncOperation::opCode op,
boost::shared_ptr<SimpleQueue> q);
virtual ~MessageAsyncContext();
- qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
- const char* getOpStr() const;
boost::intrusive_ptr<SimpleMessage> getMessage() const;
boost::shared_ptr<SimpleQueue> getQueue() const;
void destroy();
private:
boost::intrusive_ptr<SimpleMessage> m_msg;
- const qpid::asyncStore::AsyncOperation::opCode m_op;
boost::shared_ptr<SimpleQueue> m_q;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
index f2eea9bad3..0312f61d3c 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
@@ -33,12 +33,10 @@ namespace asyncPerf {
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
qpid::broker::TxnHandle& th,
- const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
qpid::broker::AsyncResultQueue* const arq) :
m_q(q),
m_th(th),
- m_op(op),
m_rcb(rcb),
m_arq(arq)
{
@@ -48,13 +46,11 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
boost::intrusive_ptr<SimpleMessage> msg,
qpid::broker::TxnHandle& th,
- const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
qpid::broker::AsyncResultQueue* const arq) :
m_q(q),
m_msg(msg),
m_th(th),
- m_op(op),
m_rcb(rcb),
m_arq(arq)
{
@@ -65,18 +61,6 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
QueueAsyncContext::~QueueAsyncContext()
{}
-qpid::asyncStore::AsyncOperation::opCode
-QueueAsyncContext::getOpCode() const
-{
- return m_op;
-}
-
-const char*
-QueueAsyncContext::getOpStr() const
-{
- return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
-}
-
boost::shared_ptr<SimpleQueue>
QueueAsyncContext::getQueue() const
{
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
index e3e87b8ad8..4e3d9fe2db 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
@@ -49,18 +49,14 @@ class QueueAsyncContext: public qpid::broker::BrokerAsyncContext
public:
QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
qpid::broker::TxnHandle& th,
- const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
qpid::broker::AsyncResultQueue* const arq);
QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
boost::intrusive_ptr<SimpleMessage> msg,
qpid::broker::TxnHandle& th,
- const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
qpid::broker::AsyncResultQueue* const arq);
virtual ~QueueAsyncContext();
- qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
- const char* getOpStr() const;
boost::shared_ptr<SimpleQueue> getQueue() const;
boost::intrusive_ptr<SimpleMessage> getMessage() const;
qpid::broker::TxnHandle getTxnHandle() const;
@@ -72,8 +68,7 @@ public:
private:
boost::shared_ptr<SimpleQueue> m_q;
boost::intrusive_ptr<SimpleMessage> m_msg;
- qpid::broker::TxnHandle m_th;
- const qpid::asyncStore::AsyncOperation::opCode m_op;
+ qpid::broker::TxnHandle m_th; // TODO: get rid of this
qpid::broker::AsyncResultCallback m_rcb;
qpid::broker::AsyncResultQueue* const m_arq;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index 2a6f2b208b..79b8b46919 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -33,7 +33,6 @@
#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/TxnHandle.h"
-#include <boost/make_shared.hpp>
#include <string.h> // memcpy()
namespace tests {
@@ -69,43 +68,6 @@ SimpleQueue::SimpleQueue(const std::string& name,
SimpleQueue::~SimpleQueue()
{}
-// static
-void
-SimpleQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh)
-{
- if (arh) {
- boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
- if (arh->getErrNo()) {
- // TODO: Handle async failure here (other than by simply printing a message)
- std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
- << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
- } else {
- // Handle async success here
- switch(qc->getOpCode()) {
- case qpid::asyncStore::AsyncOperation::QUEUE_CREATE:
- qc->getQueue()->createComplete(qc);
- break;
- case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH:
- qc->getQueue()->flushComplete(qc);
- break;
- case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY:
- qc->getQueue()->destroyComplete(qc);
- break;
- case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE:
- qc->getQueue()->enqueueComplete(qc);
- break;
- case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE:
- qc->getQueue()->dequeueComplete(qc);
- break;
- default:
- std::ostringstream oss;
- oss << "tests::storePerftools::asyncPerf::SimpleQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode();
- throw qpid::Exception(oss.str());
- };
- }
- }
-}
-
const qpid::broker::QueueHandle&
SimpleQueue::getHandle() const
{
@@ -130,16 +92,28 @@ SimpleQueue::asyncCreate()
if (m_store) {
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
s_nullTxnHandle,
- qpid::asyncStore::AsyncOperation::QUEUE_CREATE,
- &handleAsyncResult,
+ &handleAsyncCreateResult,
&m_resultQueue));
- m_store->submitCreate(m_queueHandle,
- this,
- qac);
+ m_store->submitCreate(m_queueHandle, this, qac);
++m_asyncOpCounter;
}
}
+//static
+void
+SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ qc->getQueue()->createComplete(qc);
+ }
+ }
+}
+
void
SimpleQueue::asyncDestroy(const bool deleteQueue)
{
@@ -148,25 +122,38 @@ SimpleQueue::asyncDestroy(const bool deleteQueue)
if (deleteQueue) {
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
s_nullTxnHandle,
- qpid::asyncStore::AsyncOperation::QUEUE_DESTROY,
- &handleAsyncResult,
+ &handleAsyncDestroyResult,
&m_resultQueue));
- m_store->submitDestroy(m_queueHandle,
- qac);
+ m_store->submitDestroy(m_queueHandle, qac);
++m_asyncOpCounter;
}
m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000));
}
}
+//static
+void
+SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ qc->getQueue()->destroyComplete(qc);
+ }
+ }
+}
+
void
SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
{
boost::shared_ptr<QueuedMessage> qm;
if (msg->isPersistent() && m_store) {
- qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
+ qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
} else {
- qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg));
+ qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
}
enqueue(s_nullTxnHandle, qm);
push(qm);
@@ -231,9 +218,9 @@ SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
{
boost::shared_ptr<QueuedMessage> qm;
if (msg->isPersistent() && m_store) {
- qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
+ qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg));
} else {
- qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg));
+ qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg));
}
push(qm);
}
@@ -357,9 +344,6 @@ void
SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
bool /*isRecovery*/)
{
-boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm);
-assert(pqm.get());
-
m_messages->push(qm);
}
@@ -375,8 +359,7 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
pqm->payload(),
th,
- qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
- &handleAsyncResult,
+ &handleAsyncEnqueueResult,
&m_resultQueue));
// TODO : This must be done from inside store, not here
if (th.isValid()) {
@@ -389,6 +372,21 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
return true;
}
+// private static
+void
+SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ qc->getQueue()->enqueueComplete(qc);
+ }
+ }
+}
+
// private
bool
SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
@@ -398,8 +396,7 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
pqm->payload(),
th,
- qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
- &handleAsyncResult,
+ &handleAsyncDequeueResult,
&m_resultQueue));
// TODO : This must be done from inside store, not here
if (th.isValid()) {
@@ -411,6 +408,20 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
++m_asyncOpCounter;
return true;
}
+// private static
+void
+SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ // TODO: Handle async failure here (other than by simply printing a message)
+ std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
+ << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ } else {
+ qc->getQueue()->dequeueComplete(qc);
+ }
+ }
+}
// private
void
@@ -455,9 +466,8 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
- qpid::broker::TxnHandle th = qc->getTxnHandle();
-
// TODO : This must be done from inside store, not here
+ qpid::broker::TxnHandle th = qc->getTxnHandle();
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
}
@@ -470,9 +480,8 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
assert(qc->getQueue().get() == this);
--m_asyncOpCounter;
- qpid::broker::TxnHandle th = qc->getTxnHandle();
-
// TODO : This must be done from inside store, not here
+ qpid::broker::TxnHandle th = qc->getTxnHandle();
if (th.isValid()) { // transactional enqueue
th.decrOpCnt();
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
index 2763ae3159..f13febbafa 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
@@ -35,9 +35,6 @@
#include <boost/enable_shared_from_this.hpp>
namespace qpid {
-namespace asyncStore {
-//class AsyncStoreImpl;
-}
namespace broker {
class AsyncResultQueue;
}
@@ -67,13 +64,14 @@ public:
qpid::broker::AsyncResultQueue& arq);
virtual ~SimpleQueue();
- static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res);
const qpid::broker::QueueHandle& getHandle() const;
qpid::broker::QueueHandle& getHandle();
qpid::broker::AsyncStore* getStore();
void asyncCreate();
+ static void handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh);
void asyncDestroy(const bool deleteQueue);
+ static void handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh);
// --- Methods in msg handling path from qpid::Queue ---
void deliver(boost::intrusive_ptr<SimpleMessage> msg);
@@ -98,7 +96,7 @@ public:
virtual const std::string& getName() const;
virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst);
- // --- Interface DataStore ---
+ // --- Interface qpid::broker::DataStore ---
virtual uint64_t getSize();
virtual void write(char* target);
@@ -116,8 +114,7 @@ private:
bool m_destroyed;
// --- Members & methods in msg handling path copied from qpid::Queue ---
- struct UsageBarrier
- {
+ struct UsageBarrier {
SimpleQueue& m_parent;
uint32_t m_count;
qpid::sys::Monitor m_monitor;
@@ -126,8 +123,7 @@ private:
void release();
void destroy();
};
- struct ScopedUse
- {
+ struct ScopedUse {
UsageBarrier& m_barrier;
const bool m_acquired;
ScopedUse(UsageBarrier& b);
@@ -141,8 +137,10 @@ private:
// -- Async ops ---
bool asyncEnqueue(qpid::broker::TxnHandle& th,
boost::shared_ptr<PersistableQueuedMessage> pqm);
+ static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh);
bool asyncDequeue(qpid::broker::TxnHandle& th,
boost::shared_ptr<PersistableQueuedMessage> pqm);
+ static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh);
// --- Async op counter ---
void destroyCheck(const std::string& opDescr) const;