diff options
Diffstat (limited to 'cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp')
-rw-r--r-- | cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp | 51 |
1 files changed, 34 insertions, 17 deletions
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp index 3b92af9803..69af020a26 100644 --- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp @@ -37,7 +37,7 @@ namespace asyncPerf { // --- Inner class MockPersistableQueue::QueueContext --- -MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueue* q, +MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueuePtr q, const qpid::asyncStore::AsyncOperation::opCode op) : m_q(q), m_op(op) @@ -62,9 +62,10 @@ MockPersistableQueue::QueueContext::destroy() MockPersistableQueue::MockPersistableQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, - AsyncStoreImplPtr store, + qpid::asyncStore::AsyncStoreImpl* store, const TestOptions& to, const char* msgData) : + qpid::broker::PersistableQueue(), m_name(name), m_store(store), m_persistenceId(0ULL), @@ -74,11 +75,6 @@ MockPersistableQueue::MockPersistableQueue(const std::string& name, { const qpid::types::Variant::Map qo; m_queueHandle = m_store->createQueueHandle(m_name, qo); - qpid::broker::BrokerContext* bc = new QueueContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE); - m_store->submitCreate(m_queueHandle, - dynamic_cast<const qpid::broker::DataSource*>(this), - &handleAsyncResult, - bc); } MockPersistableQueue::~MockPersistableQueue() @@ -131,6 +127,25 @@ MockPersistableQueue::getHandle() return m_queueHandle; } +// static +void +MockPersistableQueue::asyncStoreCreate(MockPersistableQueuePtr& qp) +{ + qp->m_store->submitCreate(qp->m_queueHandle, + dynamic_cast<const qpid::broker::DataSource*>(qp.get()), + &handleAsyncResult, + new QueueContext(qp, qpid::asyncStore::AsyncOperation::QUEUE_CREATE)); +} + +// static +void +MockPersistableQueue::asyncStoreDestroy(MockPersistableQueuePtr& qp) +{ + qp->m_store->submitDestroy(qp->m_queueHandle, + &handleAsyncResult, + new QueueContext(qp, qpid::asyncStore::AsyncOperation::QUEUE_DESTROY)); +} + void* MockPersistableQueue::runEnqueues() { @@ -145,7 +160,9 @@ MockPersistableQueue::runEnqueues() MockPersistableMessagePtr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true)); msg->setPersistenceId(m_store->getNextRid()); qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle); - MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, this); + MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, + qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, + this); if (useTxn) { m_store->submitEnqueue(enqHandle, txn->getHandle(), @@ -158,7 +175,6 @@ MockPersistableQueue::runEnqueues() } QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn)); push(qm); -//std::cout << "**** push 0x" << std::hex << msg->getPersistenceId() << std::dec << std::endl; if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) { txn->commit(); txnCnt = 0; @@ -207,8 +223,6 @@ MockPersistableQueue::runDequeues() txn->commit(); txnCnt = 0; } - } else { -// ::usleep(100); // 0.1 ms TODO: Use a condition variable instead of sleeping/spinning } } if (txnCnt > 0) { @@ -290,23 +304,26 @@ MockPersistableQueue::write(char* target) // protected void -MockPersistableQueue::createComplete(const QueueContext* /*qc*/) +MockPersistableQueue::createComplete(const QueueContext* qc) { -//std::cout << "~~~~~ Queue name=\"" << m_name << "\": createComplete()" << std::endl; +//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": createComplete()" << std::endl << std::flush; + assert(qc->m_q.get() == this); } // protected void -MockPersistableQueue::flushComplete(const QueueContext* /*qc*/) +MockPersistableQueue::flushComplete(const QueueContext* qc) { -//std::cout << "~~~~~ Queue name=\"" << m_name << "\": flushComplete()" << std::endl; +//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": flushComplete()" << std::endl << std::flush; + assert(qc->m_q.get() == this); } // protected void -MockPersistableQueue::destroyComplete(const QueueContext* /*qc*/) +MockPersistableQueue::destroyComplete(const QueueContext* qc) { -//std::cout << "~~~~~ Queue name=\"" << m_name << "\": destroyComplete()" << std::endl; +//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": destroyComplete()" << std::endl << std::flush; + assert(qc->m_q.get() == this); } // protected |