summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp')
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp51
1 files changed, 34 insertions, 17 deletions
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
index 3b92af9803..69af020a26 100644
--- a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
@@ -37,7 +37,7 @@ namespace asyncPerf {
// --- Inner class MockPersistableQueue::QueueContext ---
-MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueue* q,
+MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueuePtr q,
const qpid::asyncStore::AsyncOperation::opCode op) :
m_q(q),
m_op(op)
@@ -62,9 +62,10 @@ MockPersistableQueue::QueueContext::destroy()
MockPersistableQueue::MockPersistableQueue(const std::string& name,
const qpid::framing::FieldTable& /*args*/,
- AsyncStoreImplPtr store,
+ qpid::asyncStore::AsyncStoreImpl* store,
const TestOptions& to,
const char* msgData) :
+ qpid::broker::PersistableQueue(),
m_name(name),
m_store(store),
m_persistenceId(0ULL),
@@ -74,11 +75,6 @@ MockPersistableQueue::MockPersistableQueue(const std::string& name,
{
const qpid::types::Variant::Map qo;
m_queueHandle = m_store->createQueueHandle(m_name, qo);
- qpid::broker::BrokerContext* bc = new QueueContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE);
- m_store->submitCreate(m_queueHandle,
- dynamic_cast<const qpid::broker::DataSource*>(this),
- &handleAsyncResult,
- bc);
}
MockPersistableQueue::~MockPersistableQueue()
@@ -131,6 +127,25 @@ MockPersistableQueue::getHandle()
return m_queueHandle;
}
+// static
+void
+MockPersistableQueue::asyncStoreCreate(MockPersistableQueuePtr& qp)
+{
+ qp->m_store->submitCreate(qp->m_queueHandle,
+ dynamic_cast<const qpid::broker::DataSource*>(qp.get()),
+ &handleAsyncResult,
+ new QueueContext(qp, qpid::asyncStore::AsyncOperation::QUEUE_CREATE));
+}
+
+// static
+void
+MockPersistableQueue::asyncStoreDestroy(MockPersistableQueuePtr& qp)
+{
+ qp->m_store->submitDestroy(qp->m_queueHandle,
+ &handleAsyncResult,
+ new QueueContext(qp, qpid::asyncStore::AsyncOperation::QUEUE_DESTROY));
+}
+
void*
MockPersistableQueue::runEnqueues()
{
@@ -145,7 +160,9 @@ MockPersistableQueue::runEnqueues()
MockPersistableMessagePtr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true));
msg->setPersistenceId(m_store->getNextRid());
qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle);
- MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, this);
+ MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg,
+ qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
+ this);
if (useTxn) {
m_store->submitEnqueue(enqHandle,
txn->getHandle(),
@@ -158,7 +175,6 @@ MockPersistableQueue::runEnqueues()
}
QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn));
push(qm);
-//std::cout << "**** push 0x" << std::hex << msg->getPersistenceId() << std::dec << std::endl;
if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) {
txn->commit();
txnCnt = 0;
@@ -207,8 +223,6 @@ MockPersistableQueue::runDequeues()
txn->commit();
txnCnt = 0;
}
- } else {
-// ::usleep(100); // 0.1 ms TODO: Use a condition variable instead of sleeping/spinning
}
}
if (txnCnt > 0) {
@@ -290,23 +304,26 @@ MockPersistableQueue::write(char* target)
// protected
void
-MockPersistableQueue::createComplete(const QueueContext* /*qc*/)
+MockPersistableQueue::createComplete(const QueueContext* qc)
{
-//std::cout << "~~~~~ Queue name=\"" << m_name << "\": createComplete()" << std::endl;
+//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": createComplete()" << std::endl << std::flush;
+ assert(qc->m_q.get() == this);
}
// protected
void
-MockPersistableQueue::flushComplete(const QueueContext* /*qc*/)
+MockPersistableQueue::flushComplete(const QueueContext* qc)
{
-//std::cout << "~~~~~ Queue name=\"" << m_name << "\": flushComplete()" << std::endl;
+//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": flushComplete()" << std::endl << std::flush;
+ assert(qc->m_q.get() == this);
}
// protected
void
-MockPersistableQueue::destroyComplete(const QueueContext* /*qc*/)
+MockPersistableQueue::destroyComplete(const QueueContext* qc)
{
-//std::cout << "~~~~~ Queue name=\"" << m_name << "\": destroyComplete()" << std::endl;
+//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": destroyComplete()" << std::endl << std::flush;
+ assert(qc->m_q.get() == this);
}
// protected