diff options
Diffstat (limited to 'cpp/src/tests')
6 files changed, 79 insertions, 111 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp index 5bcf3fe401..e3bfe9ae7a 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp @@ -32,10 +32,8 @@ namespace storePerftools { namespace asyncPerf { MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, - const qpid::asyncStore::AsyncOperation::opCode op, boost::shared_ptr<SimpleQueue> q) : m_msg(msg), - m_op(op), m_q(q) { assert(m_msg.get() != 0); @@ -45,18 +43,6 @@ MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg MessageAsyncContext::~MessageAsyncContext() {} -qpid::asyncStore::AsyncOperation::opCode -MessageAsyncContext::getOpCode() const -{ - return m_op; -} - -const char* -MessageAsyncContext::getOpStr() const -{ - return qpid::asyncStore::AsyncOperation::getOpStr(m_op); -} - boost::intrusive_ptr<SimpleMessage> MessageAsyncContext::getMessage() const { diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h index 77d7be286b..9252fbda45 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h @@ -40,18 +40,14 @@ class MessageAsyncContext : public qpid::broker::BrokerAsyncContext { public: MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, - const qpid::asyncStore::AsyncOperation::opCode op, boost::shared_ptr<SimpleQueue> q); virtual ~MessageAsyncContext(); - qpid::asyncStore::AsyncOperation::opCode getOpCode() const; - const char* getOpStr() const; boost::intrusive_ptr<SimpleMessage> getMessage() const; boost::shared_ptr<SimpleQueue> getQueue() const; void destroy(); private: boost::intrusive_ptr<SimpleMessage> m_msg; - const qpid::asyncStore::AsyncOperation::opCode m_op; boost::shared_ptr<SimpleQueue> m_q; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp index f2eea9bad3..0312f61d3c 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp @@ -33,12 +33,10 @@ namespace asyncPerf { QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, qpid::broker::TxnHandle& th, - const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq) : m_q(q), m_th(th), - m_op(op), m_rcb(rcb), m_arq(arq) { @@ -48,13 +46,11 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, boost::intrusive_ptr<SimpleMessage> msg, qpid::broker::TxnHandle& th, - const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq) : m_q(q), m_msg(msg), m_th(th), - m_op(op), m_rcb(rcb), m_arq(arq) { @@ -65,18 +61,6 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, QueueAsyncContext::~QueueAsyncContext() {} -qpid::asyncStore::AsyncOperation::opCode -QueueAsyncContext::getOpCode() const -{ - return m_op; -} - -const char* -QueueAsyncContext::getOpStr() const -{ - return qpid::asyncStore::AsyncOperation::getOpStr(m_op); -} - boost::shared_ptr<SimpleQueue> QueueAsyncContext::getQueue() const { diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h index e3e87b8ad8..4e3d9fe2db 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h @@ -49,18 +49,14 @@ class QueueAsyncContext: public qpid::broker::BrokerAsyncContext public: QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, qpid::broker::TxnHandle& th, - const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, boost::intrusive_ptr<SimpleMessage> msg, qpid::broker::TxnHandle& th, - const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq); virtual ~QueueAsyncContext(); - qpid::asyncStore::AsyncOperation::opCode getOpCode() const; - const char* getOpStr() const; boost::shared_ptr<SimpleQueue> getQueue() const; boost::intrusive_ptr<SimpleMessage> getMessage() const; qpid::broker::TxnHandle getTxnHandle() const; @@ -72,8 +68,7 @@ public: private: boost::shared_ptr<SimpleQueue> m_q; boost::intrusive_ptr<SimpleMessage> m_msg; - qpid::broker::TxnHandle m_th; - const qpid::asyncStore::AsyncOperation::opCode m_op; + qpid::broker::TxnHandle m_th; // TODO: get rid of this qpid::broker::AsyncResultCallback m_rcb; qpid::broker::AsyncResultQueue* const m_arq; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index 2a6f2b208b..79b8b46919 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -33,7 +33,6 @@ #include "qpid/broker/AsyncResultHandle.h" #include "qpid/broker/TxnHandle.h" -#include <boost/make_shared.hpp> #include <string.h> // memcpy() namespace tests { @@ -69,43 +68,6 @@ SimpleQueue::SimpleQueue(const std::string& name, SimpleQueue::~SimpleQueue() {} -// static -void -SimpleQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh) -{ - if (arh) { - boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); - if (arh->getErrNo()) { - // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; - } else { - // Handle async success here - switch(qc->getOpCode()) { - case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: - qc->getQueue()->createComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH: - qc->getQueue()->flushComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: - qc->getQueue()->destroyComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: - qc->getQueue()->enqueueComplete(qc); - break; - case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: - qc->getQueue()->dequeueComplete(qc); - break; - default: - std::ostringstream oss; - oss << "tests::storePerftools::asyncPerf::SimpleQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode(); - throw qpid::Exception(oss.str()); - }; - } - } -} - const qpid::broker::QueueHandle& SimpleQueue::getHandle() const { @@ -130,16 +92,28 @@ SimpleQueue::asyncCreate() if (m_store) { boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), s_nullTxnHandle, - qpid::asyncStore::AsyncOperation::QUEUE_CREATE, - &handleAsyncResult, + &handleAsyncCreateResult, &m_resultQueue)); - m_store->submitCreate(m_queueHandle, - this, - qac); + m_store->submitCreate(m_queueHandle, this, qac); ++m_asyncOpCounter; } } +//static +void +SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + qc->getQueue()->createComplete(qc); + } + } +} + void SimpleQueue::asyncDestroy(const bool deleteQueue) { @@ -148,25 +122,38 @@ SimpleQueue::asyncDestroy(const bool deleteQueue) if (deleteQueue) { boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), s_nullTxnHandle, - qpid::asyncStore::AsyncOperation::QUEUE_DESTROY, - &handleAsyncResult, + &handleAsyncDestroyResult, &m_resultQueue)); - m_store->submitDestroy(m_queueHandle, - qac); + m_store->submitDestroy(m_queueHandle, qac); ++m_asyncOpCounter; } m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); } } +//static +void +SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + qc->getQueue()->destroyComplete(qc); + } + } +} + void SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) { boost::shared_ptr<QueuedMessage> qm; if (msg->isPersistent() && m_store) { - qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); + qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); } else { - qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg)); + qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)); } enqueue(s_nullTxnHandle, qm); push(qm); @@ -231,9 +218,9 @@ SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) { boost::shared_ptr<QueuedMessage> qm; if (msg->isPersistent() && m_store) { - qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); + qm = boost::shared_ptr<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); } else { - qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg)); + qm = boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)); } push(qm); } @@ -357,9 +344,6 @@ void SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm, bool /*isRecovery*/) { -boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); -assert(pqm.get()); - m_messages->push(qm); } @@ -375,8 +359,7 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), pqm->payload(), th, - qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, - &handleAsyncResult, + &handleAsyncEnqueueResult, &m_resultQueue)); // TODO : This must be done from inside store, not here if (th.isValid()) { @@ -389,6 +372,21 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, return true; } +// private static +void +SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + qc->getQueue()->enqueueComplete(qc); + } + } +} + // private bool SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, @@ -398,8 +396,7 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), pqm->payload(), th, - qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, - &handleAsyncResult, + &handleAsyncDequeueResult, &m_resultQueue)); // TODO : This must be done from inside store, not here if (th.isValid()) { @@ -411,6 +408,20 @@ SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, ++m_asyncOpCounter; return true; } +// private static +void +SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + qc->getQueue()->dequeueComplete(qc); + } + } +} // private void @@ -455,9 +466,8 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) assert(qc->getQueue().get() == this); --m_asyncOpCounter; - qpid::broker::TxnHandle th = qc->getTxnHandle(); - // TODO : This must be done from inside store, not here + qpid::broker::TxnHandle th = qc->getTxnHandle(); if (th.isValid()) { // transactional enqueue th.decrOpCnt(); } @@ -470,9 +480,8 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) assert(qc->getQueue().get() == this); --m_asyncOpCounter; - qpid::broker::TxnHandle th = qc->getTxnHandle(); - // TODO : This must be done from inside store, not here + qpid::broker::TxnHandle th = qc->getTxnHandle(); if (th.isValid()) { // transactional enqueue th.decrOpCnt(); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h index 2763ae3159..f13febbafa 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h @@ -35,9 +35,6 @@ #include <boost/enable_shared_from_this.hpp> namespace qpid { -namespace asyncStore { -//class AsyncStoreImpl; -} namespace broker { class AsyncResultQueue; } @@ -67,13 +64,14 @@ public: qpid::broker::AsyncResultQueue& arq); virtual ~SimpleQueue(); - static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res); const qpid::broker::QueueHandle& getHandle() const; qpid::broker::QueueHandle& getHandle(); qpid::broker::AsyncStore* getStore(); void asyncCreate(); + static void handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh); void asyncDestroy(const bool deleteQueue); + static void handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh); // --- Methods in msg handling path from qpid::Queue --- void deliver(boost::intrusive_ptr<SimpleMessage> msg); @@ -98,7 +96,7 @@ public: virtual const std::string& getName() const; virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst); - // --- Interface DataStore --- + // --- Interface qpid::broker::DataStore --- virtual uint64_t getSize(); virtual void write(char* target); @@ -116,8 +114,7 @@ private: bool m_destroyed; // --- Members & methods in msg handling path copied from qpid::Queue --- - struct UsageBarrier - { + struct UsageBarrier { SimpleQueue& m_parent; uint32_t m_count; qpid::sys::Monitor m_monitor; @@ -126,8 +123,7 @@ private: void release(); void destroy(); }; - struct ScopedUse - { + struct ScopedUse { UsageBarrier& m_barrier; const bool m_acquired; ScopedUse(UsageBarrier& b); @@ -141,8 +137,10 @@ private: // -- Async ops --- bool asyncEnqueue(qpid::broker::TxnHandle& th, boost::shared_ptr<PersistableQueuedMessage> pqm); + static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh); bool asyncDequeue(qpid::broker::TxnHandle& th, boost::shared_ptr<PersistableQueuedMessage> pqm); + static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh); // --- Async op counter --- void destroyCheck(const std::string& opDescr) const; |