summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp')
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp345
1 files changed, 345 insertions, 0 deletions
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
new file mode 100644
index 0000000000..ff9b76c421
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockPersistableQueue.cpp
+ */
+
+#include "MockPersistableQueue.h"
+
+#include "MockPersistableMessage.h"
+#include "MockTransactionContext.h"
+#include "QueuedMessage.h"
+#include "TestOptions.h"
+
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/broker/EnqueueHandle.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+// --- Inner class MockPersistableQueue::QueueContext ---
+
+MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueue* q,
+ const qpid::asyncStore::AsyncOperation::opCode op) :
+ m_q(q),
+ m_op(op)
+{}
+
+MockPersistableQueue::QueueContext::~QueueContext()
+{}
+
+const char*
+MockPersistableQueue::QueueContext::getOp() const
+{
+ return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
+}
+
+void
+MockPersistableQueue::QueueContext::destroy()
+{
+ delete this;
+}
+
+// --- Class MockPersistableQueue ---
+
+MockPersistableQueue::MockPersistableQueue(const std::string& name,
+ const qpid::framing::FieldTable& /*args*/,
+ AsyncStoreImplPtr store,
+ const TestOptions& to,
+ const char* msgData) :
+ m_name(name),
+ m_store(store),
+ m_persistenceId(0ULL),
+ m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this.
+ m_perfTestOpts(to),
+ m_msgData(msgData)
+{
+ const qpid::types::Variant::Map qo;
+ m_queueHandle = m_store->createQueueHandle(m_name, qo);
+ qpid::broker::BrokerContext* bc = new QueueContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE);
+ m_store->submitCreate(m_queueHandle,
+ dynamic_cast<const qpid::broker::DataSource*>(this),
+ &handleAsyncResult,
+ bc);
+}
+
+MockPersistableQueue::~MockPersistableQueue()
+{
+// m_store->flush(*this);
+ // TODO: Make destroying the store a test parameter
+// m_store->destroy(*this);
+// m_store = 0;
+}
+
+// static
+void
+MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc)
+{
+ if (bc && res) {
+ QueueContext* qc = dynamic_cast<QueueContext*>(bc);
+ if (qc->m_q) {
+ if (res->errNo) {
+ // TODO: Handle async failure here
+ std::cerr << "Queue name=\"" << qc->m_q->m_name << "\": Operation " << qc->getOp() << ": failure "
+ << res->errNo << " (" << res->errMsg << ")" << std::endl;
+ } else {
+ // Handle async success here
+ switch(qc->m_op) {
+ case qpid::asyncStore::AsyncOperation::QUEUE_CREATE:
+ qc->m_q->createComplete(qc);
+ break;
+ case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH:
+ qc->m_q->flushComplete(qc);
+ break;
+ case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY:
+ qc->m_q->destroyComplete(qc);
+ break;
+ default:
+ std::ostringstream oss;
+ oss << "tests::storePerftools::asyncPerf::MockPersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->m_op;
+ throw qpid::Exception(oss.str());
+ };
+ }
+ }
+ }
+ if (bc) delete bc;
+ if (res) delete res;
+}
+
+qpid::broker::QueueHandle&
+MockPersistableQueue::getHandle()
+{
+ return m_queueHandle;
+}
+
+void*
+MockPersistableQueue::runEnqueues()
+{
+ uint32_t numMsgs = 0;
+ uint16_t txnCnt = 0;
+ const bool useTxn = m_perfTestOpts.m_enqTxnBlockSize > 0;
+ MockTransactionContextPtr txn;
+ while (numMsgs < m_perfTestOpts.m_numMsgs) {
+ if (useTxn && txnCnt == 0) {
+ txn.reset(new MockTransactionContext(m_store)); // equivalent to begin()
+ }
+ MockPersistableMessagePtr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true));
+ msg->setPersistenceId(m_store->getNextRid());
+ qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle);
+ MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, this);
+ if (useTxn) {
+ m_store->submitEnqueue(enqHandle,
+ txn->getHandle(),
+ &MockPersistableMessage::handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(msgCtxt));
+ } else {
+ m_store->submitEnqueue(enqHandle,
+ &MockPersistableMessage::handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(msgCtxt));
+ }
+ QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn));
+ push(qm);
+//std::cout << "**** push 0x" << std::hex << msg->getPersistenceId() << std::dec << std::endl;
+ if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) {
+ txn->commit();
+ txnCnt = 0;
+ }
+ ++numMsgs;
+ }
+ if (txnCnt > 0) {
+ txn->commit();
+ txnCnt = 0;
+ }
+ return 0;
+}
+
+void*
+MockPersistableQueue::runDequeues()
+{
+ uint32_t numMsgs = 0;
+ const uint32_t numMsgsToDequeue = m_perfTestOpts.m_numMsgs * m_perfTestOpts.m_numEnqThreadsPerQueue / m_perfTestOpts.m_numDeqThreadsPerQueue;
+ uint16_t txnCnt = 0;
+ const bool useTxn = m_perfTestOpts.m_deqTxnBlockSize > 0;
+ MockTransactionContextPtr txn;
+ QueuedMessagePtr qm;
+ while (numMsgs < numMsgsToDequeue) {
+ if (useTxn && txnCnt == 0) {
+ txn.reset(new MockTransactionContext(m_store)); // equivalent to begin()
+ }
+ pop(qm);
+ if (qm.get()) {
+ qpid::broker::EnqueueHandle enqHandle = qm->getEnqueueHandle();
+ qpid::broker::BrokerContext* bc = new MockPersistableMessage::MessageContext(qm->getMessage(),
+ qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
+ this);
+ if (useTxn) {
+ m_store->submitDequeue(enqHandle,
+ txn->getHandle(),
+ &MockPersistableMessage::handleAsyncResult,
+ bc);
+ } else {
+ m_store->submitDequeue(enqHandle,
+ &MockPersistableMessage::handleAsyncResult,
+ bc);
+ }
+ ++numMsgs;
+ qm.reset(static_cast<QueuedMessage*>(0));
+ if (useTxn && ++txnCnt >= m_perfTestOpts.m_deqTxnBlockSize) {
+ txn->commit();
+ txnCnt = 0;
+ }
+ } else {
+// ::usleep(100); // 0.1 ms TODO: Use a condition variable instead of sleeping/spinning
+ }
+ }
+ if (txnCnt > 0) {
+ txn->commit();
+ txnCnt = 0;
+ }
+ return 0;
+}
+
+//static
+void*
+MockPersistableQueue::startEnqueues(void* ptr)
+{
+ return reinterpret_cast<MockPersistableQueue*>(ptr)->runEnqueues();
+}
+
+//static
+void*
+MockPersistableQueue::startDequeues(void* ptr)
+{
+ return reinterpret_cast<MockPersistableQueue*>(ptr)->runDequeues();
+}
+
+void
+MockPersistableQueue::encode(qpid::framing::Buffer& buffer) const
+{
+ buffer.putShortString(m_name);
+}
+
+uint32_t
+MockPersistableQueue::encodedSize() const
+{
+ return m_name.size() + 1;
+}
+
+uint64_t
+MockPersistableQueue::getPersistenceId() const
+{
+ return m_persistenceId;
+}
+
+void
+MockPersistableQueue::setPersistenceId(uint64_t persistenceId) const
+{
+ m_persistenceId = persistenceId;
+}
+
+void
+MockPersistableQueue::flush()
+{
+ //if(m_store) m_store->flush(*this);
+}
+
+const std::string&
+MockPersistableQueue::getName() const
+{
+ return m_name;
+}
+
+void
+MockPersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst)
+{
+ if (externalQueueStore != inst && externalQueueStore)
+ delete externalQueueStore;
+ externalQueueStore = inst;
+}
+
+uint64_t
+MockPersistableQueue::getSize()
+{
+ return m_persistableData.size();
+}
+
+void
+MockPersistableQueue::write(char* target)
+{
+ ::memcpy(target, m_persistableData.data(), m_persistableData.size());
+}
+
+// protected
+void
+MockPersistableQueue::createComplete(const QueueContext* /*qc*/)
+{
+//std::cout << "~~~~~ Queue name=\"" << m_name << "\": createComplete()" << std::endl;
+}
+
+// protected
+void
+MockPersistableQueue::flushComplete(const QueueContext* /*qc*/)
+{
+//std::cout << "~~~~~ Queue name=\"" << m_name << "\": flushComplete()" << std::endl;
+}
+
+// protected
+void
+MockPersistableQueue::destroyComplete(const QueueContext* /*qc*/)
+{
+//std::cout << "~~~~~ Queue name=\"" << m_name << "\": destroyComplete()" << std::endl;
+}
+
+// protected
+void
+MockPersistableQueue::push(QueuedMessagePtr& qm)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+ m_enqueuedMsgs.push_back(qm);
+ m_dequeueCondition.notify();
+}
+
+// protected
+void
+MockPersistableQueue::pop(QueuedMessagePtr& qm)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+ if (m_enqueuedMsgs.empty()) {
+ m_dequeueCondition.wait(m_enqueuedMsgsMutex);
+ }
+ qm = m_enqueuedMsgs.front();
+ if (qm->isTransactional()) {
+ // The next msg is still in an open transaction, skip and find next non-open-txn msg
+ MsgEnqListItr i = m_enqueuedMsgs.begin();
+ while (++i != m_enqueuedMsgs.end()) {
+ if (!(*i)->isTransactional()) {
+ qm = *i;
+ m_enqueuedMsgs.erase(i);
+ }
+ }
+ } else {
+ // The next msg is not in an open txn
+ m_enqueuedMsgs.pop_front();
+ }
+}
+
+}}} // namespace tests::storePerftools::asyncPerf