From a3861c46a7151a250fd06f54a60b9c1fe4bd6a1e Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 11 May 2012 12:19:07 +0000 Subject: QPID-3858: Added async queue deletion and mechanism to correctly wait for async completion of store deletion before destroying queue objects git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1337126 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp | 3 +- .../asyncPerf/MockPersistableMessage.cpp | 13 +++--- .../asyncPerf/MockPersistableMessage.h | 3 +- .../asyncPerf/MockPersistableQueue.cpp | 51 ++++++++++++++-------- .../asyncPerf/MockPersistableQueue.h | 15 ++++--- .../asyncPerf/MockTransactionContext.cpp | 17 +++++--- .../asyncPerf/MockTransactionContext.h | 6 +-- .../tests/storePerfTools/asyncPerf/PerfTest.cpp | 48 ++++++++++---------- cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h | 11 ++--- 9 files changed, 97 insertions(+), 70 deletions(-) diff --git a/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp b/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp index 939d65ddf9..6dac318d14 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreOptions.cpp @@ -58,7 +58,8 @@ AsyncStoreOptions::validate() {} //static -std::string& AsyncStoreOptions::getDefaultStoreDir() +std::string& +AsyncStoreOptions::getDefaultStoreDir() { static std::string s_defaultStoreDir = "/tmp"; return s_defaultStoreDir; diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp index 75fc921494..5cc829f4d2 100644 --- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp @@ -22,6 +22,7 @@ */ #include "MockPersistableMessage.h" +#include "MockPersistableQueue.h" // debug statements in enqueueComplete() and dequeueComplete() #include "qpid/asyncStore/AsyncStoreImpl.h" @@ -59,7 +60,7 @@ MockPersistableMessage::MessageContext::destroy() MockPersistableMessage::MockPersistableMessage(const char* msgData, const uint32_t msgSize, - AsyncStoreImplPtr store, + qpid::asyncStore::AsyncStoreImpl* store, const bool persistent) : m_persistenceId(0ULL), m_msg(msgData, static_cast(msgSize)), @@ -163,16 +164,18 @@ MockPersistableMessage::write(char* target) // protected void -MockPersistableMessage::enqueueComplete(const MessageContext* /*mc*/) +MockPersistableMessage::enqueueComplete(const MessageContext* mc) { -//std::cout << "~~~~~ Message pid=0x" << std::hex << m_persistenceId << std::dec << ": enqueueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl; +//std::cout << "~~~~~ Message pid=0x" << std::hex << mc->m_msg->getPersistenceId() << std::dec << ": enqueueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl << std::flush; + assert(mc->m_msg.get() == this); } // protected void -MockPersistableMessage::dequeueComplete(const MessageContext* /*mc*/) +MockPersistableMessage::dequeueComplete(const MessageContext* mc) { -//std::cout << "~~~~~ Message pid=0x" << std::hex << m_persistenceId << std::dec << ": dequeueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl; +//std::cout << "~~~~~ Message pid=0x" << std::hex << mc->m_msg->getPersistenceId() << std::dec << ": dequeueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl << std::flush; + assert(mc->m_msg.get() == this); } }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h index a139328690..fa9bc8e937 100644 --- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h @@ -44,7 +44,6 @@ namespace asyncPerf { class MockPersistableMessage; class MockPersistableQueue; -typedef boost::shared_ptr AsyncStoreImplPtr; typedef boost::shared_ptr MockPersistableMessagePtr; class MockPersistableMessage: public qpid::broker::PersistableMessage, qpid::broker::DataSource @@ -66,7 +65,7 @@ public: MockPersistableMessage(const char* msgData, const uint32_t msgSize, - AsyncStoreImplPtr store, + qpid::asyncStore::AsyncStoreImpl* store, const bool persistent = true); virtual ~MockPersistableMessage(); static void handleAsyncResult(const qpid::broker::AsyncResult* res, diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp index 3b92af9803..69af020a26 100644 --- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp @@ -37,7 +37,7 @@ namespace asyncPerf { // --- Inner class MockPersistableQueue::QueueContext --- -MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueue* q, +MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueuePtr q, const qpid::asyncStore::AsyncOperation::opCode op) : m_q(q), m_op(op) @@ -62,9 +62,10 @@ MockPersistableQueue::QueueContext::destroy() MockPersistableQueue::MockPersistableQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, - AsyncStoreImplPtr store, + qpid::asyncStore::AsyncStoreImpl* store, const TestOptions& to, const char* msgData) : + qpid::broker::PersistableQueue(), m_name(name), m_store(store), m_persistenceId(0ULL), @@ -74,11 +75,6 @@ MockPersistableQueue::MockPersistableQueue(const std::string& name, { const qpid::types::Variant::Map qo; m_queueHandle = m_store->createQueueHandle(m_name, qo); - qpid::broker::BrokerContext* bc = new QueueContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE); - m_store->submitCreate(m_queueHandle, - dynamic_cast(this), - &handleAsyncResult, - bc); } MockPersistableQueue::~MockPersistableQueue() @@ -131,6 +127,25 @@ MockPersistableQueue::getHandle() return m_queueHandle; } +// static +void +MockPersistableQueue::asyncStoreCreate(MockPersistableQueuePtr& qp) +{ + qp->m_store->submitCreate(qp->m_queueHandle, + dynamic_cast(qp.get()), + &handleAsyncResult, + new QueueContext(qp, qpid::asyncStore::AsyncOperation::QUEUE_CREATE)); +} + +// static +void +MockPersistableQueue::asyncStoreDestroy(MockPersistableQueuePtr& qp) +{ + qp->m_store->submitDestroy(qp->m_queueHandle, + &handleAsyncResult, + new QueueContext(qp, qpid::asyncStore::AsyncOperation::QUEUE_DESTROY)); +} + void* MockPersistableQueue::runEnqueues() { @@ -145,7 +160,9 @@ MockPersistableQueue::runEnqueues() MockPersistableMessagePtr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true)); msg->setPersistenceId(m_store->getNextRid()); qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle); - MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, this); + MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, + qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, + this); if (useTxn) { m_store->submitEnqueue(enqHandle, txn->getHandle(), @@ -158,7 +175,6 @@ MockPersistableQueue::runEnqueues() } QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn)); push(qm); -//std::cout << "**** push 0x" << std::hex << msg->getPersistenceId() << std::dec << std::endl; if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) { txn->commit(); txnCnt = 0; @@ -207,8 +223,6 @@ MockPersistableQueue::runDequeues() txn->commit(); txnCnt = 0; } - } else { -// ::usleep(100); // 0.1 ms TODO: Use a condition variable instead of sleeping/spinning } } if (txnCnt > 0) { @@ -290,23 +304,26 @@ MockPersistableQueue::write(char* target) // protected void -MockPersistableQueue::createComplete(const QueueContext* /*qc*/) +MockPersistableQueue::createComplete(const QueueContext* qc) { -//std::cout << "~~~~~ Queue name=\"" << m_name << "\": createComplete()" << std::endl; +//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": createComplete()" << std::endl << std::flush; + assert(qc->m_q.get() == this); } // protected void -MockPersistableQueue::flushComplete(const QueueContext* /*qc*/) +MockPersistableQueue::flushComplete(const QueueContext* qc) { -//std::cout << "~~~~~ Queue name=\"" << m_name << "\": flushComplete()" << std::endl; +//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": flushComplete()" << std::endl << std::flush; + assert(qc->m_q.get() == this); } // protected void -MockPersistableQueue::destroyComplete(const QueueContext* /*qc*/) +MockPersistableQueue::destroyComplete(const QueueContext* qc) { -//std::cout << "~~~~~ Queue name=\"" << m_name << "\": destroyComplete()" << std::endl; +//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": destroyComplete()" << std::endl << std::flush; + assert(qc->m_q.get() == this); } // protected diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h index a77f41743c..c3b461477c 100644 --- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h @@ -47,30 +47,31 @@ namespace tests { namespace storePerftools { namespace asyncPerf { +class MockPersistableQueue; class QueuedMessage; class TestOptions; -typedef boost::shared_ptr AsyncStoreImplPtr; +typedef boost::shared_ptr MockPersistableQueuePtr; typedef boost::shared_ptr QueuedMessagePtr; -class MockPersistableQueue : public qpid::broker::PersistableQueue, qpid::broker::DataSource +class MockPersistableQueue : public qpid::broker::PersistableQueue, public qpid::broker::DataSource { public: class QueueContext : public qpid::broker::BrokerContext { public: - QueueContext(MockPersistableQueue* q, + QueueContext(MockPersistableQueuePtr q, const qpid::asyncStore::AsyncOperation::opCode op); virtual ~QueueContext(); const char* getOp() const; void destroy(); - MockPersistableQueue* m_q; + MockPersistableQueuePtr m_q; const qpid::asyncStore::AsyncOperation::opCode m_op; }; MockPersistableQueue(const std::string& name, const qpid::framing::FieldTable& args, - AsyncStoreImplPtr store, + qpid::asyncStore::AsyncStoreImpl* store, const TestOptions& perfTestParams, const char* msgData); virtual ~MockPersistableQueue(); @@ -79,6 +80,8 @@ public: static void handleAsyncResult(const qpid::broker::AsyncResult* res, qpid::broker::BrokerContext* bc); qpid::broker::QueueHandle& getHandle(); + static void asyncStoreCreate(MockPersistableQueuePtr& qp); + static void asyncStoreDestroy(MockPersistableQueuePtr& qp); // --- Performance test thread entry points --- void* runEnqueues(); @@ -103,7 +106,7 @@ public: protected: const std::string m_name; - AsyncStoreImplPtr m_store; + qpid::asyncStore::AsyncStoreImpl* m_store; mutable uint64_t m_persistenceId; std::string m_persistableData; qpid::broker::QueueHandle m_queueHandle; diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp index e564876daa..10be34c6f5 100644 --- a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp @@ -57,7 +57,7 @@ MockTransactionContext::TransactionContext::destroy() // --- Class MockTransactionContext --- -MockTransactionContext::MockTransactionContext(AsyncStoreImplPtr store, +MockTransactionContext::MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, const std::string& xid) : m_store(store), m_txnHandle(store->createTxnHandle(xid)), @@ -190,30 +190,33 @@ MockTransactionContext::localPrepare() // protected void -MockTransactionContext::prepareComplete(const TransactionContext* /*tc*/) +MockTransactionContext::prepareComplete(const TransactionContext* tc) { qpid::sys::ScopedLock l(m_enqueuedMsgsMutex); while (!m_enqueuedMsgs.empty()) { m_enqueuedMsgs.front()->clearTransaction(); m_enqueuedMsgs.pop_front(); } -//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": prepareComplete()" << std::endl; +//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush; + assert(tc->m_tc == this); } // protected void -MockTransactionContext::abortComplete(const TransactionContext* /*tc*/) +MockTransactionContext::abortComplete(const TransactionContext* tc) { -//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": abortComplete()" << std::endl; +//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush; + assert(tc->m_tc == this); } // protected void -MockTransactionContext::commitComplete(const TransactionContext* /*tc*/) +MockTransactionContext::commitComplete(const TransactionContext* tc) { -//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": commitComplete()" << std::endl; +//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush; + assert(tc->m_tc == this); } }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h index 35de7374c5..3c467a7b5d 100644 --- a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h @@ -45,8 +45,6 @@ namespace asyncPerf { class QueuedMessage; -typedef boost::shared_ptr AsyncStoreImplPtr; - class MockTransactionContext : public qpid::broker::TransactionContext { public: @@ -65,7 +63,7 @@ public: const qpid::asyncStore::AsyncOperation::opCode m_op; }; - MockTransactionContext(AsyncStoreImplPtr store, + MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, const std::string& xid = std::string()); virtual ~MockTransactionContext(); static void handleAsyncResult(const qpid::broker::AsyncResult* res, @@ -81,7 +79,7 @@ public: void commit(); protected: - AsyncStoreImplPtr m_store; + qpid::asyncStore::AsyncStoreImpl* m_store; qpid::broker::TxnHandle m_txnHandle; bool m_prepared; std::deque m_enqueuedMsgs; diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp index fe66604774..983f088616 100644 --- a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp @@ -44,7 +44,8 @@ PerfTest::PerfTest(const TestOptions& to, m_testResult(to), m_msgData(new char[to.m_msgSize]), m_poller(new qpid::sys::Poller), - m_pollingThread(m_poller.get()) + m_pollingThread(m_poller.get()), + m_store(0) { std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize); } @@ -53,33 +54,38 @@ PerfTest::~PerfTest() { m_poller->shutdown(); m_pollingThread.join(); + + m_queueList.clear(); + + if (m_store) delete m_store; delete[] m_msgData; } -AsyncStoreImplPtr +void PerfTest::prepareStore() { - AsyncStoreImplPtr store(new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts)); - store->initialize(); - return store; + m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts); + m_store->initialize(); } void -PerfTest::prepareQueues(std::deque& jrnlList, - AsyncStoreImplPtr store) +PerfTest::prepareQueues() { for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { std::ostringstream qname; qname << "queue_" << std::setw(4) << std::setfill('0') << i; - MockPersistableQueuePtr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, store, m_testOpts, m_msgData)); - jrnlList.push_back(mpq); + MockPersistableQueuePtr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store, m_testOpts, m_msgData)); + mpq->asyncStoreCreate(mpq); + m_queueList.push_back(mpq); } } void -PerfTest::destroyQueues(std::deque& jrnlList) +PerfTest::destroyQueues() { - jrnlList.clear(); + for (std::deque::iterator i=m_queueList.begin(); i!=m_queueList.end(); ++i) { + (*i)->asyncStoreDestroy(*i); + } } void @@ -87,10 +93,8 @@ PerfTest::run() { typedef boost::shared_ptr ThreadPtr; // TODO - replace with qpid threads - AsyncStoreImplPtr store = prepareStore(); - - std::deque queueList; - prepareQueues(queueList, store); + prepareStore(); + prepareQueues(); std::deque threads; { // --- Start of timed section --- @@ -98,13 +102,13 @@ PerfTest::run() for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) { for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads - ThreadPtr tp(new tests::storePerftools::common::Thread(queueList[q]->startEnqueues, - reinterpret_cast(queueList[q].get()))); + ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startEnqueues, + reinterpret_cast(m_queueList[q].get()))); threads.push_back(tp); } for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads - ThreadPtr tp(new tests::storePerftools::common::Thread(queueList[q]->startDequeues, - reinterpret_cast(queueList[q].get()))); + ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startDequeues, + reinterpret_cast(m_queueList[q].get()))); threads.push_back(tp); } } @@ -113,10 +117,8 @@ PerfTest::run() threads.pop_front(); } } // --- End of timed section --- - - destroyQueues(queueList); -// DEBUG MEASURE - REMOVE WHEN FIXED -//::sleep(2); + // TODO: Add test param to allow queues to be destroyed or left when test ends + destroyQueues(); } void diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h index 1eb11a51fa..544cd6b3ae 100644 --- a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h +++ b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h @@ -48,7 +48,7 @@ namespace asyncPerf { class MockPersistableQueue; class TestOptions; -typedef boost::shared_ptr AsyncStoreImplPtr; +//typedef boost::shared_ptr AsyncStoreImplPtr; typedef boost::shared_ptr MockPersistableQueuePtr; class PerfTest : public tests::storePerftools::common::Streamable @@ -68,11 +68,12 @@ protected: const char* m_msgData; boost::shared_ptr m_poller; qpid::sys::Thread m_pollingThread; + qpid::asyncStore::AsyncStoreImpl* m_store; + std::deque m_queueList; - AsyncStoreImplPtr prepareStore(); - void prepareQueues(std::deque& jrnlList, - AsyncStoreImplPtr store); - void destroyQueues(std::deque& jrnlList); + void prepareStore(); + void prepareQueues(); + void destroyQueues(); }; -- cgit v1.2.1