diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-06-07 12:42:37 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-06-07 12:42:37 +0000 |
commit | 22d453646b4815752134ad62e0b27841a103afb2 (patch) | |
tree | 152b6447a5c097b9617c10b7309775fc7987f996 /cpp/src/tests | |
parent | 45d67efe63abecddf5ca7a68c45f308664bd1466 (diff) | |
download | qpid-python-22d453646b4815752134ad62e0b27841a103afb2.tar.gz |
QPID-3858: WIP - added AsyncResultQueue for async result return path
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1347588 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
9 files changed, 34 insertions, 16 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp index e7cab4d621..fc04bc746e 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp @@ -34,7 +34,7 @@ MockPersistableMessage::MockPersistableMessage(const char* msgData, qpid::asyncStore::AsyncStoreImpl* store) : m_persistenceId(0ULL), m_msg(msgData, static_cast<size_t>(msgSize)), - m_msgHandle(store ? store->createMessageHandle(this) : store->createMessageHandle(0)) + m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0)) {} MockPersistableMessage::~MockPersistableMessage() diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp index 009f54a157..49d656aee4 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp @@ -30,6 +30,7 @@ #include "QueuedMessage.h" #include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/AsyncResultQueue.h" namespace tests { namespace storePerftools { @@ -37,10 +38,12 @@ namespace asyncPerf { MockPersistableQueue::MockPersistableQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, - qpid::asyncStore::AsyncStoreImpl* store) : + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& resultQueue) : qpid::broker::PersistableQueue(), m_name(name), m_store(store), + m_resultQueue(resultQueue), m_asyncOpCounter(0UL), m_persistenceId(0ULL), m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. @@ -64,6 +67,7 @@ MockPersistableQueue::~MockPersistableQueue() } // static +/* void MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, qpid::broker::BrokerAsyncContext* bc) @@ -102,6 +106,7 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, if (bc) delete bc; if (res) delete res; } +*/ const qpid::broker::QueueHandle& MockPersistableQueue::getHandle() const @@ -124,10 +129,12 @@ MockPersistableQueue::getStore() void MockPersistableQueue::asyncCreate() { + qpid::broker::ResultCallback rcb = &qpid::broker::AsyncResultQueue::submit; if (m_store) { m_store->submitCreate(m_queueHandle, this, - &handleAsyncResult, + rcb, +// &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/, new QueueAsyncContext(shared_from_this(), qpid::asyncStore::AsyncOperation::QUEUE_CREATE)); ++m_asyncOpCounter; @@ -141,7 +148,7 @@ MockPersistableQueue::asyncDestroy(const bool deleteQueue) if (m_store) { if (deleteQueue) { m_store->submitDestroy(m_queueHandle, - &handleAsyncResult, + &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/, new QueueAsyncContext(shared_from_this(), qpid::asyncStore::AsyncOperation::QUEUE_DESTROY)); ++m_asyncOpCounter; @@ -329,7 +336,7 @@ MockPersistableQueue::asyncEnqueue(MockTransactionContext* txn, //std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; m_store->submitEnqueue(/*enqHandle*/qm.enqHandle(), txn->getHandle(), - &handleAsyncResult, + &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/, new QueueAsyncContext(shared_from_this(), qm.payload(), qpid::asyncStore::AsyncOperation::MSG_ENQUEUE)); @@ -346,7 +353,7 @@ MockPersistableQueue::asyncDequeue(MockTransactionContext* txn, qpid::broker::EnqueueHandle enqHandle = qm.enqHandle(); m_store->submitDequeue(enqHandle, txn->getHandle(), - &handleAsyncResult, + &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/, new QueueAsyncContext(shared_from_this(), qm.payload(), qpid::asyncStore::AsyncOperation::MSG_DEQUEUE)); diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h index ff6db93542..e62aeec420 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h @@ -38,7 +38,9 @@ namespace qpid { namespace asyncStore { class AsyncStoreImpl; } - +namespace broker { +class AsyncResultQueue; +} namespace framing { class FieldTable; }} @@ -61,11 +63,12 @@ class MockPersistableQueue : public boost::enable_shared_from_this<MockPersistab public: MockPersistableQueue(const std::string& name, const qpid::framing::FieldTable& args, - qpid::asyncStore::AsyncStoreImpl* store); + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& rq); virtual ~MockPersistableQueue(); - static void handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerAsyncContext* bc); +// static void handleAsyncResult(const qpid::broker::AsyncResult* res, +// qpid::broker::BrokerAsyncContext* bc); const qpid::broker::QueueHandle& getHandle() const; qpid::broker::QueueHandle& getHandle(); qpid::asyncStore::AsyncStoreImpl* getStore(); @@ -99,6 +102,7 @@ public: protected: const std::string m_name; qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncResultQueue& m_resultQueue; AsyncOpCounter m_asyncOpCounter; mutable uint64_t m_persistenceId; std::string m_persistableData; diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp index c444f596e5..0ac0c7732f 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp @@ -64,6 +64,7 @@ MockTransactionContext::~MockTransactionContext() {} // static +/* void MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, qpid::broker::BrokerAsyncContext* bc) @@ -96,6 +97,7 @@ MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, if (bc) delete bc; if (res) delete res; } +*/ const qpid::broker::TxnHandle& MockTransactionContext::getHandle() const diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h index 3f70b0bfda..d727caede6 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h @@ -53,8 +53,8 @@ public: MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, const std::string& xid = std::string()); virtual ~MockTransactionContext(); - static void handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerAsyncContext* bc); +// static void handleAsyncResult(const qpid::broker::AsyncResult* res, +// qpid::broker::BrokerAsyncContext* bc); const qpid::broker::TxnHandle& getHandle() const; qpid::broker::TxnHandle& getHandle(); diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index 184a899570..66e0bb3dbf 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -32,6 +32,7 @@ #include "tests/storePerftools/common/Thread.h" #include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/AsyncResultQueue.h" #include "qpid/sys/Poller.h" #include <iomanip> @@ -48,6 +49,7 @@ PerfTest::PerfTest(const TestOptions& to, m_msgData(new char[to.m_msgSize]), m_poller(new qpid::sys::Poller), m_pollingThread(m_poller.get()), + m_resultQueue(m_poller), m_store(0) { std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize); @@ -68,7 +70,7 @@ PerfTest::~PerfTest() void PerfTest::prepareStore() { - m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts); + m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts, &m_resultQueue); m_store->initialize(); } @@ -86,7 +88,7 @@ PerfTest::prepareQueues() for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { std::ostringstream qname; qname << "queue_" << std::setw(4) << std::setfill('0') << i; - boost::shared_ptr<MockPersistableQueue> mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store)); + boost::shared_ptr<MockPersistableQueue> mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); mpq->asyncCreate(); m_queueList.push_back(mpq); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h index 3bd3f6bd32..46455e4af0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h @@ -28,6 +28,7 @@ #include "tests/storePerftools/common/Streamable.h" +#include "qpid/broker/AsyncResultQueue.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Thread.h" @@ -69,6 +70,7 @@ protected: const char* m_msgData; boost::shared_ptr<qpid::sys::Poller> m_poller; qpid::sys::Thread m_pollingThread; + qpid::broker::AsyncResultQueue m_resultQueue; qpid::asyncStore::AsyncStoreImpl* m_store; std::deque<boost::shared_ptr<MockPersistableQueue> > m_queueList; std::deque<boost::shared_ptr<MessageProducer> > m_producers; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp index 7903d6551a..802279bbf9 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -27,6 +27,7 @@ #include "MockPersistableQueue.h" #include "qpid/asyncStore/AsyncStoreImpl.h" +//#include "qpid/broker/EnqueueHandle.h" namespace tests { namespace storePerftools { @@ -40,7 +41,7 @@ QueuedMessage::QueuedMessage(MockPersistableQueue* q, boost::shared_ptr<MockPersistableMessage> msg) : m_queue(q), m_msg(msg), - m_enqHandle(q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle())) + m_enqHandle(q->getStore() ? q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()) : qpid::broker::EnqueueHandle(0)) {} QueuedMessage::QueuedMessage(const QueuedMessage& qm) : diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp index 2f4461e8b5..dccfc4fcbf 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp @@ -86,7 +86,7 @@ TestOptions::doAddOptions() ("durable", qpid::optValue(m_durable), "Queues and messages are durable") ("destroy-queues", qpid::optValue(m_destroyQueuesOnCompletion), - "Destroy queue recoreds persistent store on test completion") + "Destroy queues in persistent store on test completion") ; } |