diff options
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp')
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp | 344 |
1 files changed, 210 insertions, 134 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp index ede0830045..009f54a157 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp @@ -23,16 +23,13 @@ #include "MockPersistableQueue.h" -#include "MessageAsyncContext.h" +#include "MessageDeque.h" #include "MockPersistableMessage.h" #include "MockTransactionContext.h" #include "QueueAsyncContext.h" #include "QueuedMessage.h" -#include "TestOptions.h" #include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/BrokerAsyncContext.h" -#include "qpid/broker/EnqueueHandle.h" namespace tests { namespace storePerftools { @@ -40,19 +37,22 @@ namespace asyncPerf { MockPersistableQueue::MockPersistableQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, - qpid::asyncStore::AsyncStoreImpl* store, - const TestOptions& to, - const char* msgData) : + qpid::asyncStore::AsyncStoreImpl* store) : qpid::broker::PersistableQueue(), m_name(name), m_store(store), + m_asyncOpCounter(0UL), m_persistenceId(0ULL), m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. - m_perfTestOpts(to), - m_msgData(msgData) + m_destroyPending(false), + m_destroyed(false), + m_barrier(*this), + m_messages(new MessageDeque()) { - const qpid::types::Variant::Map qo; - m_queueHandle = m_store->createQueueHandle(m_name, qo); + if (m_store != 0) { + const qpid::types::Variant::Map qo; + m_queueHandle = m_store->createQueueHandle(m_name, qo); + } } MockPersistableQueue::~MockPersistableQueue() @@ -71,7 +71,7 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, if (bc && res) { QueueAsyncContext* qc = dynamic_cast<QueueAsyncContext*>(bc); if (res->errNo) { - // TODO: Handle async failure here + // TODO: Handle async failure here (other than by simply printing a message) std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " << res->errNo << " (" << res->errMsg << ")" << std::endl; } else { @@ -86,6 +86,12 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: qc->getQueue()->destroyComplete(qc); break; + case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: + qc->getQueue()->enqueueComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: + qc->getQueue()->dequeueComplete(qc); + break; default: std::ostringstream oss; oss << "tests::storePerftools::asyncPerf::MockPersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode(); @@ -97,127 +103,100 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, if (res) delete res; } +const qpid::broker::QueueHandle& +MockPersistableQueue::getHandle() const +{ + return m_queueHandle; +} + qpid::broker::QueueHandle& MockPersistableQueue::getHandle() { return m_queueHandle; } -void -MockPersistableQueue::asyncStoreCreate() +qpid::asyncStore::AsyncStoreImpl* +MockPersistableQueue::getStore() { - m_store->submitCreate(m_queueHandle, - this, - &handleAsyncResult, - new QueueAsyncContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE)); + return m_store; } void -MockPersistableQueue::asyncStoreDestroy() +MockPersistableQueue::asyncCreate() { - m_store->submitDestroy(m_queueHandle, - &handleAsyncResult, - new QueueAsyncContext(this, qpid::asyncStore::AsyncOperation::QUEUE_DESTROY)); + if (m_store) { + m_store->submitCreate(m_queueHandle, + this, + &handleAsyncResult, + new QueueAsyncContext(shared_from_this(), + qpid::asyncStore::AsyncOperation::QUEUE_CREATE)); + ++m_asyncOpCounter; + } } -void* -MockPersistableQueue::runEnqueues() +void +MockPersistableQueue::asyncDestroy(const bool deleteQueue) { - uint32_t numMsgs = 0; - uint16_t txnCnt = 0; - const bool useTxn = m_perfTestOpts.m_enqTxnBlockSize > 0; - MockTransactionContextPtr txn; - while (numMsgs < m_perfTestOpts.m_numMsgs) { - if (useTxn && txnCnt == 0) { - txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() + m_destroyPending = true; + if (m_store) { + if (deleteQueue) { + m_store->submitDestroy(m_queueHandle, + &handleAsyncResult, + new QueueAsyncContext(shared_from_this(), + qpid::asyncStore::AsyncOperation::QUEUE_DESTROY)); + ++m_asyncOpCounter; } - MockPersistableMessage::shared_ptr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true)); - msg->setPersistenceId(m_store->getNextRid()); - qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle); - MessageContext* msgCtxt = new MessageContext(msg, - qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, - this); - if (useTxn) { - m_store->submitEnqueue(enqHandle, - txn->getHandle(), - &MockPersistableMessage::handleAsyncResult, - dynamic_cast<qpid::broker::BrokerAsyncContext*>(msgCtxt)); - } else { - m_store->submitEnqueue(enqHandle, - &MockPersistableMessage::handleAsyncResult, - dynamic_cast<qpid::broker::BrokerAsyncContext*>(msgCtxt)); - } - QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn)); - push(qm); - if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) { - txn->commit(); - txnCnt = 0; - } - ++numMsgs; - } - if (txnCnt > 0) { - txn->commit(); - txnCnt = 0; + m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); } - return 0; } -void* -MockPersistableQueue::runDequeues() +void +MockPersistableQueue::deliver(boost::shared_ptr<MockPersistableMessage> msg) { - uint32_t numMsgs = 0; - const uint32_t numMsgsToDequeue = m_perfTestOpts.m_numMsgs * m_perfTestOpts.m_numEnqThreadsPerQueue / m_perfTestOpts.m_numDeqThreadsPerQueue; - uint16_t txnCnt = 0; - const bool useTxn = m_perfTestOpts.m_deqTxnBlockSize > 0; - MockTransactionContextPtr txn; - QueuedMessagePtr qm; - while (numMsgs < numMsgsToDequeue) { - if (useTxn && txnCnt == 0) { - txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() - } - pop(qm); - if (qm.get()) { - qpid::broker::EnqueueHandle enqHandle = qm->getEnqueueHandle(); - qpid::broker::BrokerAsyncContext* bc = new MessageContext(qm->getMessage(), - qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, - this); - if (useTxn) { - m_store->submitDequeue(enqHandle, - txn->getHandle(), - &MockPersistableMessage::handleAsyncResult, - bc); - } else { - m_store->submitDequeue(enqHandle, - &MockPersistableMessage::handleAsyncResult, - bc); - } - ++numMsgs; - qm.reset(static_cast<QueuedMessage*>(0)); - if (useTxn && ++txnCnt >= m_perfTestOpts.m_deqTxnBlockSize) { - txn->commit(); - txnCnt = 0; - } - } + QueuedMessage qm(this, msg); + if(enqueue((MockTransactionContext*)0, qm)) { + push(qm); } - if (txnCnt > 0) { - txn->commit(); - txnCnt = 0; +} + +bool +MockPersistableQueue::dispatch() +{ + QueuedMessage qm; + if (m_messages->consume(qm)) { + return dequeue((MockTransactionContext*)0, qm); } - return 0; + return false; } -//static -void* -MockPersistableQueue::startEnqueues(void* ptr) +bool +MockPersistableQueue::enqueue(MockTransactionContext* ctxt, + QueuedMessage& qm) { - return reinterpret_cast<MockPersistableQueue*>(ptr)->runEnqueues(); + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm.payload()->isPersistent() && m_store) { + qm.payload()->enqueueAsync(shared_from_this(), m_store); + return asyncEnqueue(ctxt, qm); + } + return false; } -//static -void* -MockPersistableQueue::startDequeues(void* ptr) +bool +MockPersistableQueue::dequeue(MockTransactionContext* ctxt, + QueuedMessage& qm) { - return reinterpret_cast<MockPersistableQueue*>(ptr)->runDequeues(); + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm.payload()->isPersistent() && m_store) { + qm.payload()->dequeueAsync(shared_from_this(), m_store); + return asyncDequeue(ctxt, qm); + } + return false; } void @@ -276,61 +255,158 @@ MockPersistableQueue::write(char* target) ::memcpy(target, m_persistableData.data(), m_persistableData.size()); } +// --- Members & methods in msg handling path from qpid::Queue --- + +// protected +MockPersistableQueue::UsageBarrier::UsageBarrier(MockPersistableQueue& q) : + m_parent(q), + m_count(0) +{} + +// protected +bool +MockPersistableQueue::UsageBarrier::acquire() +{ + qpid::sys::Monitor::ScopedLock l(m_monitor); + if (m_parent.m_destroyed) { + return false; + } else { + ++m_count; + return true; + } +} + +// protected +void MockPersistableQueue::UsageBarrier::release() +{ + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + if (--m_count == 0) { + m_monitor.notifyAll(); + } +} + +// protected +void MockPersistableQueue::UsageBarrier::destroy() +{ + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + m_parent.m_destroyed = true; + while (m_count) { + m_monitor.wait(); + } +} + +// protected +MockPersistableQueue::ScopedUse::ScopedUse(UsageBarrier& b) : + m_barrier(b), + m_acquired(m_barrier.acquire()) +{} + +// protected +MockPersistableQueue::ScopedUse::~ScopedUse() +{ + if (m_acquired) { + m_barrier.release(); + } +} + +// protected +void +MockPersistableQueue::push(QueuedMessage& qm, + bool /*isRecovery*/) +{ + QueuedMessage removed; + m_messages->push(qm, removed); +} + +// --- End Members & methods in msg handling path from qpid::Queue --- + +// protected +bool +MockPersistableQueue::asyncEnqueue(MockTransactionContext* txn, + QueuedMessage& qm) +{ + qm.payload()->setPersistenceId(m_store->getNextRid()); +//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; + m_store->submitEnqueue(/*enqHandle*/qm.enqHandle(), + txn->getHandle(), + &handleAsyncResult, + new QueueAsyncContext(shared_from_this(), + qm.payload(), + qpid::asyncStore::AsyncOperation::MSG_ENQUEUE)); + ++m_asyncOpCounter; + return true; +} + +// protected +bool +MockPersistableQueue::asyncDequeue(MockTransactionContext* txn, + QueuedMessage& qm) +{ +//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; + qpid::broker::EnqueueHandle enqHandle = qm.enqHandle(); + m_store->submitDequeue(enqHandle, + txn->getHandle(), + &handleAsyncResult, + new QueueAsyncContext(shared_from_this(), + qm.payload(), + qpid::asyncStore::AsyncOperation::MSG_DEQUEUE)); + ++m_asyncOpCounter; + return true; +} + +// protected +void +MockPersistableQueue::destroyCheck(const std::string& opDescr) const +{ + if (m_destroyPending || m_destroyed) { + std::ostringstream oss; + oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; + throw qpid::Exception(oss.str()); + } +} + // protected void MockPersistableQueue::createComplete(const QueueAsyncContext* qc) { -//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": createComplete()" << std::endl << std::flush; +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": createComplete()" << std::endl << std::flush; assert(qc->getQueue().get() == this); + --m_asyncOpCounter; } // protected void MockPersistableQueue::flushComplete(const QueueAsyncContext* qc) { -//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": flushComplete()" << std::endl << std::flush; +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": flushComplete()" << std::endl << std::flush; assert(qc->getQueue().get() == this); + --m_asyncOpCounter; } // protected void MockPersistableQueue::destroyComplete(const QueueAsyncContext* qc) { -//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": destroyComplete()" << std::endl << std::flush; +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": destroyComplete()" << std::endl << std::flush; assert(qc->getQueue().get() == this); + --m_asyncOpCounter; + m_destroyed = true; } -// protected void -MockPersistableQueue::push(QueuedMessagePtr& qm) +MockPersistableQueue::enqueueComplete(const QueueAsyncContext* qc) { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); - m_enqueuedMsgs.push_back(qm); - m_dequeueCondition.notify(); +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; } -// protected void -MockPersistableQueue::pop(QueuedMessagePtr& qm) +MockPersistableQueue::dequeueComplete(const QueueAsyncContext* qc) { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); - while (m_enqueuedMsgs.empty()) { - m_dequeueCondition.wait(m_enqueuedMsgsMutex); - } - qm = m_enqueuedMsgs.front(); - if (qm->isTransactional()) { - // The next msg is still in an open transaction, skip and find next non-open-txn msg - MsgEnqListItr i = m_enqueuedMsgs.begin(); - while (++i != m_enqueuedMsgs.end()) { - if (!(*i)->isTransactional()) { - qm = *i; - m_enqueuedMsgs.erase(i); - } - } - } else { - // The next msg is not in an open txn - m_enqueuedMsgs.pop_front(); - } +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; } }}} // namespace tests::storePerftools::asyncPerf |