summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp344
1 files changed, 210 insertions, 134 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
index ede0830045..009f54a157 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
@@ -23,16 +23,13 @@
#include "MockPersistableQueue.h"
-#include "MessageAsyncContext.h"
+#include "MessageDeque.h"
#include "MockPersistableMessage.h"
#include "MockTransactionContext.h"
#include "QueueAsyncContext.h"
#include "QueuedMessage.h"
-#include "TestOptions.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
-#include "qpid/broker/BrokerAsyncContext.h"
-#include "qpid/broker/EnqueueHandle.h"
namespace tests {
namespace storePerftools {
@@ -40,19 +37,22 @@ namespace asyncPerf {
MockPersistableQueue::MockPersistableQueue(const std::string& name,
const qpid::framing::FieldTable& /*args*/,
- qpid::asyncStore::AsyncStoreImpl* store,
- const TestOptions& to,
- const char* msgData) :
+ qpid::asyncStore::AsyncStoreImpl* store) :
qpid::broker::PersistableQueue(),
m_name(name),
m_store(store),
+ m_asyncOpCounter(0UL),
m_persistenceId(0ULL),
m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this.
- m_perfTestOpts(to),
- m_msgData(msgData)
+ m_destroyPending(false),
+ m_destroyed(false),
+ m_barrier(*this),
+ m_messages(new MessageDeque())
{
- const qpid::types::Variant::Map qo;
- m_queueHandle = m_store->createQueueHandle(m_name, qo);
+ if (m_store != 0) {
+ const qpid::types::Variant::Map qo;
+ m_queueHandle = m_store->createQueueHandle(m_name, qo);
+ }
}
MockPersistableQueue::~MockPersistableQueue()
@@ -71,7 +71,7 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
if (bc && res) {
QueueAsyncContext* qc = dynamic_cast<QueueAsyncContext*>(bc);
if (res->errNo) {
- // TODO: Handle async failure here
+ // TODO: Handle async failure here (other than by simply printing a message)
std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
<< res->errNo << " (" << res->errMsg << ")" << std::endl;
} else {
@@ -86,6 +86,12 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY:
qc->getQueue()->destroyComplete(qc);
break;
+ case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE:
+ qc->getQueue()->enqueueComplete(qc);
+ break;
+ case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE:
+ qc->getQueue()->dequeueComplete(qc);
+ break;
default:
std::ostringstream oss;
oss << "tests::storePerftools::asyncPerf::MockPersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode();
@@ -97,127 +103,100 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
if (res) delete res;
}
+const qpid::broker::QueueHandle&
+MockPersistableQueue::getHandle() const
+{
+ return m_queueHandle;
+}
+
qpid::broker::QueueHandle&
MockPersistableQueue::getHandle()
{
return m_queueHandle;
}
-void
-MockPersistableQueue::asyncStoreCreate()
+qpid::asyncStore::AsyncStoreImpl*
+MockPersistableQueue::getStore()
{
- m_store->submitCreate(m_queueHandle,
- this,
- &handleAsyncResult,
- new QueueAsyncContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE));
+ return m_store;
}
void
-MockPersistableQueue::asyncStoreDestroy()
+MockPersistableQueue::asyncCreate()
{
- m_store->submitDestroy(m_queueHandle,
- &handleAsyncResult,
- new QueueAsyncContext(this, qpid::asyncStore::AsyncOperation::QUEUE_DESTROY));
+ if (m_store) {
+ m_store->submitCreate(m_queueHandle,
+ this,
+ &handleAsyncResult,
+ new QueueAsyncContext(shared_from_this(),
+ qpid::asyncStore::AsyncOperation::QUEUE_CREATE));
+ ++m_asyncOpCounter;
+ }
}
-void*
-MockPersistableQueue::runEnqueues()
+void
+MockPersistableQueue::asyncDestroy(const bool deleteQueue)
{
- uint32_t numMsgs = 0;
- uint16_t txnCnt = 0;
- const bool useTxn = m_perfTestOpts.m_enqTxnBlockSize > 0;
- MockTransactionContextPtr txn;
- while (numMsgs < m_perfTestOpts.m_numMsgs) {
- if (useTxn && txnCnt == 0) {
- txn.reset(new MockTransactionContext(m_store)); // equivalent to begin()
+ m_destroyPending = true;
+ if (m_store) {
+ if (deleteQueue) {
+ m_store->submitDestroy(m_queueHandle,
+ &handleAsyncResult,
+ new QueueAsyncContext(shared_from_this(),
+ qpid::asyncStore::AsyncOperation::QUEUE_DESTROY));
+ ++m_asyncOpCounter;
}
- MockPersistableMessage::shared_ptr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true));
- msg->setPersistenceId(m_store->getNextRid());
- qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle);
- MessageContext* msgCtxt = new MessageContext(msg,
- qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
- this);
- if (useTxn) {
- m_store->submitEnqueue(enqHandle,
- txn->getHandle(),
- &MockPersistableMessage::handleAsyncResult,
- dynamic_cast<qpid::broker::BrokerAsyncContext*>(msgCtxt));
- } else {
- m_store->submitEnqueue(enqHandle,
- &MockPersistableMessage::handleAsyncResult,
- dynamic_cast<qpid::broker::BrokerAsyncContext*>(msgCtxt));
- }
- QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn));
- push(qm);
- if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) {
- txn->commit();
- txnCnt = 0;
- }
- ++numMsgs;
- }
- if (txnCnt > 0) {
- txn->commit();
- txnCnt = 0;
+ m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000));
}
- return 0;
}
-void*
-MockPersistableQueue::runDequeues()
+void
+MockPersistableQueue::deliver(boost::shared_ptr<MockPersistableMessage> msg)
{
- uint32_t numMsgs = 0;
- const uint32_t numMsgsToDequeue = m_perfTestOpts.m_numMsgs * m_perfTestOpts.m_numEnqThreadsPerQueue / m_perfTestOpts.m_numDeqThreadsPerQueue;
- uint16_t txnCnt = 0;
- const bool useTxn = m_perfTestOpts.m_deqTxnBlockSize > 0;
- MockTransactionContextPtr txn;
- QueuedMessagePtr qm;
- while (numMsgs < numMsgsToDequeue) {
- if (useTxn && txnCnt == 0) {
- txn.reset(new MockTransactionContext(m_store)); // equivalent to begin()
- }
- pop(qm);
- if (qm.get()) {
- qpid::broker::EnqueueHandle enqHandle = qm->getEnqueueHandle();
- qpid::broker::BrokerAsyncContext* bc = new MessageContext(qm->getMessage(),
- qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
- this);
- if (useTxn) {
- m_store->submitDequeue(enqHandle,
- txn->getHandle(),
- &MockPersistableMessage::handleAsyncResult,
- bc);
- } else {
- m_store->submitDequeue(enqHandle,
- &MockPersistableMessage::handleAsyncResult,
- bc);
- }
- ++numMsgs;
- qm.reset(static_cast<QueuedMessage*>(0));
- if (useTxn && ++txnCnt >= m_perfTestOpts.m_deqTxnBlockSize) {
- txn->commit();
- txnCnt = 0;
- }
- }
+ QueuedMessage qm(this, msg);
+ if(enqueue((MockTransactionContext*)0, qm)) {
+ push(qm);
}
- if (txnCnt > 0) {
- txn->commit();
- txnCnt = 0;
+}
+
+bool
+MockPersistableQueue::dispatch()
+{
+ QueuedMessage qm;
+ if (m_messages->consume(qm)) {
+ return dequeue((MockTransactionContext*)0, qm);
}
- return 0;
+ return false;
}
-//static
-void*
-MockPersistableQueue::startEnqueues(void* ptr)
+bool
+MockPersistableQueue::enqueue(MockTransactionContext* ctxt,
+ QueuedMessage& qm)
{
- return reinterpret_cast<MockPersistableQueue*>(ptr)->runEnqueues();
+ ScopedUse u(m_barrier);
+ if (!u.m_acquired) {
+ return false;
+ }
+ if (qm.payload()->isPersistent() && m_store) {
+ qm.payload()->enqueueAsync(shared_from_this(), m_store);
+ return asyncEnqueue(ctxt, qm);
+ }
+ return false;
}
-//static
-void*
-MockPersistableQueue::startDequeues(void* ptr)
+bool
+MockPersistableQueue::dequeue(MockTransactionContext* ctxt,
+ QueuedMessage& qm)
{
- return reinterpret_cast<MockPersistableQueue*>(ptr)->runDequeues();
+ ScopedUse u(m_barrier);
+ if (!u.m_acquired) {
+ return false;
+ }
+ if (qm.payload()->isPersistent() && m_store) {
+ qm.payload()->dequeueAsync(shared_from_this(), m_store);
+ return asyncDequeue(ctxt, qm);
+ }
+ return false;
}
void
@@ -276,61 +255,158 @@ MockPersistableQueue::write(char* target)
::memcpy(target, m_persistableData.data(), m_persistableData.size());
}
+// --- Members & methods in msg handling path from qpid::Queue ---
+
+// protected
+MockPersistableQueue::UsageBarrier::UsageBarrier(MockPersistableQueue& q) :
+ m_parent(q),
+ m_count(0)
+{}
+
+// protected
+bool
+MockPersistableQueue::UsageBarrier::acquire()
+{
+ qpid::sys::Monitor::ScopedLock l(m_monitor);
+ if (m_parent.m_destroyed) {
+ return false;
+ } else {
+ ++m_count;
+ return true;
+ }
+}
+
+// protected
+void MockPersistableQueue::UsageBarrier::release()
+{
+ qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor);
+ if (--m_count == 0) {
+ m_monitor.notifyAll();
+ }
+}
+
+// protected
+void MockPersistableQueue::UsageBarrier::destroy()
+{
+ qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor);
+ m_parent.m_destroyed = true;
+ while (m_count) {
+ m_monitor.wait();
+ }
+}
+
+// protected
+MockPersistableQueue::ScopedUse::ScopedUse(UsageBarrier& b) :
+ m_barrier(b),
+ m_acquired(m_barrier.acquire())
+{}
+
+// protected
+MockPersistableQueue::ScopedUse::~ScopedUse()
+{
+ if (m_acquired) {
+ m_barrier.release();
+ }
+}
+
+// protected
+void
+MockPersistableQueue::push(QueuedMessage& qm,
+ bool /*isRecovery*/)
+{
+ QueuedMessage removed;
+ m_messages->push(qm, removed);
+}
+
+// --- End Members & methods in msg handling path from qpid::Queue ---
+
+// protected
+bool
+MockPersistableQueue::asyncEnqueue(MockTransactionContext* txn,
+ QueuedMessage& qm)
+{
+ qm.payload()->setPersistenceId(m_store->getNextRid());
+//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
+ m_store->submitEnqueue(/*enqHandle*/qm.enqHandle(),
+ txn->getHandle(),
+ &handleAsyncResult,
+ new QueueAsyncContext(shared_from_this(),
+ qm.payload(),
+ qpid::asyncStore::AsyncOperation::MSG_ENQUEUE));
+ ++m_asyncOpCounter;
+ return true;
+}
+
+// protected
+bool
+MockPersistableQueue::asyncDequeue(MockTransactionContext* txn,
+ QueuedMessage& qm)
+{
+//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
+ qpid::broker::EnqueueHandle enqHandle = qm.enqHandle();
+ m_store->submitDequeue(enqHandle,
+ txn->getHandle(),
+ &handleAsyncResult,
+ new QueueAsyncContext(shared_from_this(),
+ qm.payload(),
+ qpid::asyncStore::AsyncOperation::MSG_DEQUEUE));
+ ++m_asyncOpCounter;
+ return true;
+}
+
+// protected
+void
+MockPersistableQueue::destroyCheck(const std::string& opDescr) const
+{
+ if (m_destroyPending || m_destroyed) {
+ std::ostringstream oss;
+ oss << opDescr << " on queue \"" << m_name << "\" after call to destroy";
+ throw qpid::Exception(oss.str());
+ }
+}
+
// protected
void
MockPersistableQueue::createComplete(const QueueAsyncContext* qc)
{
-//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": createComplete()" << std::endl << std::flush;
+//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": createComplete()" << std::endl << std::flush;
assert(qc->getQueue().get() == this);
+ --m_asyncOpCounter;
}
// protected
void
MockPersistableQueue::flushComplete(const QueueAsyncContext* qc)
{
-//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": flushComplete()" << std::endl << std::flush;
+//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": flushComplete()" << std::endl << std::flush;
assert(qc->getQueue().get() == this);
+ --m_asyncOpCounter;
}
// protected
void
MockPersistableQueue::destroyComplete(const QueueAsyncContext* qc)
{
-//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": destroyComplete()" << std::endl << std::flush;
+//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": destroyComplete()" << std::endl << std::flush;
assert(qc->getQueue().get() == this);
+ --m_asyncOpCounter;
+ m_destroyed = true;
}
-// protected
void
-MockPersistableQueue::push(QueuedMessagePtr& qm)
+MockPersistableQueue::enqueueComplete(const QueueAsyncContext* qc)
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
- m_enqueuedMsgs.push_back(qm);
- m_dequeueCondition.notify();
+//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush;
+ assert(qc->getQueue().get() == this);
+ --m_asyncOpCounter;
}
-// protected
void
-MockPersistableQueue::pop(QueuedMessagePtr& qm)
+MockPersistableQueue::dequeueComplete(const QueueAsyncContext* qc)
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
- while (m_enqueuedMsgs.empty()) {
- m_dequeueCondition.wait(m_enqueuedMsgsMutex);
- }
- qm = m_enqueuedMsgs.front();
- if (qm->isTransactional()) {
- // The next msg is still in an open transaction, skip and find next non-open-txn msg
- MsgEnqListItr i = m_enqueuedMsgs.begin();
- while (++i != m_enqueuedMsgs.end()) {
- if (!(*i)->isTransactional()) {
- qm = *i;
- m_enqueuedMsgs.erase(i);
- }
- }
- } else {
- // The next msg is not in an open txn
- m_enqueuedMsgs.pop_front();
- }
+//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush;
+ assert(qc->getQueue().get() == this);
+ --m_asyncOpCounter;
}
}}} // namespace tests::storePerftools::asyncPerf