summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerfTools/asyncPerf
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/storePerfTools/asyncPerf')
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp178
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h105
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp345
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h133
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp219
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h101
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp175
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h81
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp77
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h61
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp79
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h60
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp63
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/TestResult.h93
14 files changed, 1770 insertions, 0 deletions
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp
new file mode 100644
index 0000000000..75fc921494
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockPersistableMessage.cpp
+ */
+
+#include "MockPersistableMessage.h"
+
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+// --- Inner class Queue::MessageContext ---
+
+MockPersistableMessage::MessageContext::MessageContext(MockPersistableMessagePtr msg,
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ MockPersistableQueue* q) :
+ m_msg(msg),
+ m_op(op),
+ m_q(q)
+{}
+
+MockPersistableMessage::MessageContext::~MessageContext()
+{}
+
+const char*
+MockPersistableMessage::MessageContext::getOp() const
+{
+ return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
+}
+
+void
+MockPersistableMessage::MessageContext::destroy()
+{
+ delete this;
+}
+
+// --- Class MockPersistableMessage ---
+
+
+MockPersistableMessage::MockPersistableMessage(const char* msgData,
+ const uint32_t msgSize,
+ AsyncStoreImplPtr store,
+ const bool persistent) :
+ m_persistenceId(0ULL),
+ m_msg(msgData, static_cast<size_t>(msgSize)),
+ m_persistent(persistent),
+ m_msgHandle(store->createMessageHandle(this))
+{}
+
+MockPersistableMessage::~MockPersistableMessage()
+{}
+
+// static
+void
+MockPersistableMessage::handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc)
+{
+ if (bc) {
+ MessageContext* mc = dynamic_cast<MessageContext*>(bc);
+ if (mc->m_msg) {
+ if (res->errNo) {
+ // TODO: Handle async failure here
+ std::cerr << "Message pid=0x" << std::hex << mc->m_msg->m_persistenceId << std::dec << ": Operation "
+ << mc->getOp() << ": failure " << res->errNo << " (" << res->errMsg << ")" << std::endl;
+ } else {
+ // Handle async success here
+ switch(mc->m_op) {
+ case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE:
+ mc->m_msg->dequeueComplete(mc);
+ break;
+ case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE:
+ mc->m_msg->enqueueComplete(mc);
+ break;
+ default:
+ std::ostringstream oss;
+ oss << "tests::storePerftools::asyncPerf::MockPersistableMessage::handleAsyncResult(): Unknown async queue operation: " << mc->m_op;
+ throw qpid::Exception(oss.str());
+ };
+ }
+ }
+ }
+ if (bc) delete bc;
+ if (res) delete res;
+}
+
+qpid::broker::MessageHandle&
+MockPersistableMessage::getHandle()
+{
+ return m_msgHandle;
+}
+
+void
+MockPersistableMessage::setPersistenceId(uint64_t id) const
+{
+ m_persistenceId = id;
+}
+
+uint64_t
+MockPersistableMessage::getPersistenceId() const
+{
+ return m_persistenceId;
+}
+
+void
+MockPersistableMessage::encode(qpid::framing::Buffer& buffer) const
+{
+ buffer.putRawData(m_msg);
+}
+
+uint32_t
+MockPersistableMessage::encodedSize() const
+{
+ return static_cast<uint32_t>(m_msg.size());
+}
+
+void
+MockPersistableMessage::allDequeuesComplete()
+{}
+
+uint32_t
+MockPersistableMessage::encodedHeaderSize() const
+{
+ return 0;
+}
+
+bool
+MockPersistableMessage::isPersistent() const
+{
+ return m_persistent;
+}
+
+uint64_t
+MockPersistableMessage::getSize()
+{
+ return m_msg.size();
+}
+
+void
+MockPersistableMessage::write(char* target)
+{
+ ::memcpy(target, m_msg.data(), m_msg.size());
+}
+
+// protected
+void
+MockPersistableMessage::enqueueComplete(const MessageContext* /*mc*/)
+{
+//std::cout << "~~~~~ Message pid=0x" << std::hex << m_persistenceId << std::dec << ": enqueueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl;
+}
+
+// protected
+void
+MockPersistableMessage::dequeueComplete(const MessageContext* /*mc*/)
+{
+//std::cout << "~~~~~ Message pid=0x" << std::hex << m_persistenceId << std::dec << ": dequeueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl;
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h
new file mode 100644
index 0000000000..a139328690
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockPersistableMessage.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_MockPersistableMessage_h_
+#define tests_storePerfTools_asyncPerf_MockPersistableMessage_h_
+
+#include "qpid/asyncStore/AsyncOperation.h"
+#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource
+#include "qpid/broker/BrokerContext.h"
+#include "qpid/broker/MessageHandle.h"
+#include "qpid/broker/PersistableMessage.h"
+
+#include <stdint.h> // uint32_t
+
+namespace qpid {
+namespace asyncStore {
+class AsyncStoreImpl;
+}}
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class MockPersistableMessage;
+class MockPersistableQueue;
+
+typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
+typedef boost::shared_ptr<MockPersistableMessage> MockPersistableMessagePtr;
+
+class MockPersistableMessage: public qpid::broker::PersistableMessage, qpid::broker::DataSource
+{
+public:
+ class MessageContext : public qpid::broker::BrokerContext
+ {
+ public:
+ MessageContext(MockPersistableMessagePtr msg,
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ MockPersistableQueue* q);
+ virtual ~MessageContext();
+ const char* getOp() const;
+ void destroy();
+ MockPersistableMessagePtr m_msg;
+ const qpid::asyncStore::AsyncOperation::opCode m_op;
+ MockPersistableQueue* m_q;
+ };
+
+ MockPersistableMessage(const char* msgData,
+ const uint32_t msgSize,
+ AsyncStoreImplPtr store,
+ const bool persistent = true);
+ virtual ~MockPersistableMessage();
+ static void handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc);
+ qpid::broker::MessageHandle& getHandle();
+
+ // Interface Persistable
+ virtual void setPersistenceId(uint64_t id) const;
+ virtual uint64_t getPersistenceId() const;
+ virtual void encode(qpid::framing::Buffer& buffer) const;
+ virtual uint32_t encodedSize() const;
+
+ // Interface PersistableMessage
+ virtual void allDequeuesComplete();
+ virtual uint32_t encodedHeaderSize() const;
+ virtual bool isPersistent() const;
+
+ // Interface DataStore
+ virtual uint64_t getSize();
+ virtual void write(char* target);
+
+protected:
+ mutable uint64_t m_persistenceId;
+ const std::string m_msg;
+ const bool m_persistent;
+ qpid::broker::MessageHandle m_msgHandle;
+
+ // --- Ascnc op completions (called through handleAsyncResult) ---
+ void enqueueComplete(const MessageContext* mc);
+ void dequeueComplete(const MessageContext* mc);
+
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerfTools_asyncPerf_MockPersistableMessage_h_
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
new file mode 100644
index 0000000000..ff9b76c421
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockPersistableQueue.cpp
+ */
+
+#include "MockPersistableQueue.h"
+
+#include "MockPersistableMessage.h"
+#include "MockTransactionContext.h"
+#include "QueuedMessage.h"
+#include "TestOptions.h"
+
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/broker/EnqueueHandle.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+// --- Inner class MockPersistableQueue::QueueContext ---
+
+MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueue* q,
+ const qpid::asyncStore::AsyncOperation::opCode op) :
+ m_q(q),
+ m_op(op)
+{}
+
+MockPersistableQueue::QueueContext::~QueueContext()
+{}
+
+const char*
+MockPersistableQueue::QueueContext::getOp() const
+{
+ return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
+}
+
+void
+MockPersistableQueue::QueueContext::destroy()
+{
+ delete this;
+}
+
+// --- Class MockPersistableQueue ---
+
+MockPersistableQueue::MockPersistableQueue(const std::string& name,
+ const qpid::framing::FieldTable& /*args*/,
+ AsyncStoreImplPtr store,
+ const TestOptions& to,
+ const char* msgData) :
+ m_name(name),
+ m_store(store),
+ m_persistenceId(0ULL),
+ m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this.
+ m_perfTestOpts(to),
+ m_msgData(msgData)
+{
+ const qpid::types::Variant::Map qo;
+ m_queueHandle = m_store->createQueueHandle(m_name, qo);
+ qpid::broker::BrokerContext* bc = new QueueContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE);
+ m_store->submitCreate(m_queueHandle,
+ dynamic_cast<const qpid::broker::DataSource*>(this),
+ &handleAsyncResult,
+ bc);
+}
+
+MockPersistableQueue::~MockPersistableQueue()
+{
+// m_store->flush(*this);
+ // TODO: Make destroying the store a test parameter
+// m_store->destroy(*this);
+// m_store = 0;
+}
+
+// static
+void
+MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc)
+{
+ if (bc && res) {
+ QueueContext* qc = dynamic_cast<QueueContext*>(bc);
+ if (qc->m_q) {
+ if (res->errNo) {
+ // TODO: Handle async failure here
+ std::cerr << "Queue name=\"" << qc->m_q->m_name << "\": Operation " << qc->getOp() << ": failure "
+ << res->errNo << " (" << res->errMsg << ")" << std::endl;
+ } else {
+ // Handle async success here
+ switch(qc->m_op) {
+ case qpid::asyncStore::AsyncOperation::QUEUE_CREATE:
+ qc->m_q->createComplete(qc);
+ break;
+ case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH:
+ qc->m_q->flushComplete(qc);
+ break;
+ case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY:
+ qc->m_q->destroyComplete(qc);
+ break;
+ default:
+ std::ostringstream oss;
+ oss << "tests::storePerftools::asyncPerf::MockPersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->m_op;
+ throw qpid::Exception(oss.str());
+ };
+ }
+ }
+ }
+ if (bc) delete bc;
+ if (res) delete res;
+}
+
+qpid::broker::QueueHandle&
+MockPersistableQueue::getHandle()
+{
+ return m_queueHandle;
+}
+
+void*
+MockPersistableQueue::runEnqueues()
+{
+ uint32_t numMsgs = 0;
+ uint16_t txnCnt = 0;
+ const bool useTxn = m_perfTestOpts.m_enqTxnBlockSize > 0;
+ MockTransactionContextPtr txn;
+ while (numMsgs < m_perfTestOpts.m_numMsgs) {
+ if (useTxn && txnCnt == 0) {
+ txn.reset(new MockTransactionContext(m_store)); // equivalent to begin()
+ }
+ MockPersistableMessagePtr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true));
+ msg->setPersistenceId(m_store->getNextRid());
+ qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle);
+ MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, this);
+ if (useTxn) {
+ m_store->submitEnqueue(enqHandle,
+ txn->getHandle(),
+ &MockPersistableMessage::handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(msgCtxt));
+ } else {
+ m_store->submitEnqueue(enqHandle,
+ &MockPersistableMessage::handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(msgCtxt));
+ }
+ QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn));
+ push(qm);
+//std::cout << "**** push 0x" << std::hex << msg->getPersistenceId() << std::dec << std::endl;
+ if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) {
+ txn->commit();
+ txnCnt = 0;
+ }
+ ++numMsgs;
+ }
+ if (txnCnt > 0) {
+ txn->commit();
+ txnCnt = 0;
+ }
+ return 0;
+}
+
+void*
+MockPersistableQueue::runDequeues()
+{
+ uint32_t numMsgs = 0;
+ const uint32_t numMsgsToDequeue = m_perfTestOpts.m_numMsgs * m_perfTestOpts.m_numEnqThreadsPerQueue / m_perfTestOpts.m_numDeqThreadsPerQueue;
+ uint16_t txnCnt = 0;
+ const bool useTxn = m_perfTestOpts.m_deqTxnBlockSize > 0;
+ MockTransactionContextPtr txn;
+ QueuedMessagePtr qm;
+ while (numMsgs < numMsgsToDequeue) {
+ if (useTxn && txnCnt == 0) {
+ txn.reset(new MockTransactionContext(m_store)); // equivalent to begin()
+ }
+ pop(qm);
+ if (qm.get()) {
+ qpid::broker::EnqueueHandle enqHandle = qm->getEnqueueHandle();
+ qpid::broker::BrokerContext* bc = new MockPersistableMessage::MessageContext(qm->getMessage(),
+ qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
+ this);
+ if (useTxn) {
+ m_store->submitDequeue(enqHandle,
+ txn->getHandle(),
+ &MockPersistableMessage::handleAsyncResult,
+ bc);
+ } else {
+ m_store->submitDequeue(enqHandle,
+ &MockPersistableMessage::handleAsyncResult,
+ bc);
+ }
+ ++numMsgs;
+ qm.reset(static_cast<QueuedMessage*>(0));
+ if (useTxn && ++txnCnt >= m_perfTestOpts.m_deqTxnBlockSize) {
+ txn->commit();
+ txnCnt = 0;
+ }
+ } else {
+// ::usleep(100); // 0.1 ms TODO: Use a condition variable instead of sleeping/spinning
+ }
+ }
+ if (txnCnt > 0) {
+ txn->commit();
+ txnCnt = 0;
+ }
+ return 0;
+}
+
+//static
+void*
+MockPersistableQueue::startEnqueues(void* ptr)
+{
+ return reinterpret_cast<MockPersistableQueue*>(ptr)->runEnqueues();
+}
+
+//static
+void*
+MockPersistableQueue::startDequeues(void* ptr)
+{
+ return reinterpret_cast<MockPersistableQueue*>(ptr)->runDequeues();
+}
+
+void
+MockPersistableQueue::encode(qpid::framing::Buffer& buffer) const
+{
+ buffer.putShortString(m_name);
+}
+
+uint32_t
+MockPersistableQueue::encodedSize() const
+{
+ return m_name.size() + 1;
+}
+
+uint64_t
+MockPersistableQueue::getPersistenceId() const
+{
+ return m_persistenceId;
+}
+
+void
+MockPersistableQueue::setPersistenceId(uint64_t persistenceId) const
+{
+ m_persistenceId = persistenceId;
+}
+
+void
+MockPersistableQueue::flush()
+{
+ //if(m_store) m_store->flush(*this);
+}
+
+const std::string&
+MockPersistableQueue::getName() const
+{
+ return m_name;
+}
+
+void
+MockPersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst)
+{
+ if (externalQueueStore != inst && externalQueueStore)
+ delete externalQueueStore;
+ externalQueueStore = inst;
+}
+
+uint64_t
+MockPersistableQueue::getSize()
+{
+ return m_persistableData.size();
+}
+
+void
+MockPersistableQueue::write(char* target)
+{
+ ::memcpy(target, m_persistableData.data(), m_persistableData.size());
+}
+
+// protected
+void
+MockPersistableQueue::createComplete(const QueueContext* /*qc*/)
+{
+//std::cout << "~~~~~ Queue name=\"" << m_name << "\": createComplete()" << std::endl;
+}
+
+// protected
+void
+MockPersistableQueue::flushComplete(const QueueContext* /*qc*/)
+{
+//std::cout << "~~~~~ Queue name=\"" << m_name << "\": flushComplete()" << std::endl;
+}
+
+// protected
+void
+MockPersistableQueue::destroyComplete(const QueueContext* /*qc*/)
+{
+//std::cout << "~~~~~ Queue name=\"" << m_name << "\": destroyComplete()" << std::endl;
+}
+
+// protected
+void
+MockPersistableQueue::push(QueuedMessagePtr& qm)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+ m_enqueuedMsgs.push_back(qm);
+ m_dequeueCondition.notify();
+}
+
+// protected
+void
+MockPersistableQueue::pop(QueuedMessagePtr& qm)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+ if (m_enqueuedMsgs.empty()) {
+ m_dequeueCondition.wait(m_enqueuedMsgsMutex);
+ }
+ qm = m_enqueuedMsgs.front();
+ if (qm->isTransactional()) {
+ // The next msg is still in an open transaction, skip and find next non-open-txn msg
+ MsgEnqListItr i = m_enqueuedMsgs.begin();
+ while (++i != m_enqueuedMsgs.end()) {
+ if (!(*i)->isTransactional()) {
+ qm = *i;
+ m_enqueuedMsgs.erase(i);
+ }
+ }
+ } else {
+ // The next msg is not in an open txn
+ m_enqueuedMsgs.pop_front();
+ }
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h
new file mode 100644
index 0000000000..a77f41743c
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockPersistableQueue.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_MockPersistableQueue_h_
+#define tests_storePerfTools_asyncPerf_MockPersistableQueue_h_
+
+#include "qpid/asyncStore/AsyncOperation.h"
+#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource
+#include "qpid/broker/BrokerContext.h"
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/QueueHandle.h"
+#include "qpid/sys/Condition.h"
+#include "qpid/sys/Mutex.h"
+
+#include <boost/shared_ptr.hpp>
+#include <deque>
+
+namespace qpid {
+namespace asyncStore {
+class AsyncStoreImpl;
+}
+namespace framing {
+class FieldTable;
+}}
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class QueuedMessage;
+class TestOptions;
+
+typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
+typedef boost::shared_ptr<QueuedMessage> QueuedMessagePtr;
+
+class MockPersistableQueue : public qpid::broker::PersistableQueue, qpid::broker::DataSource
+{
+public:
+ class QueueContext : public qpid::broker::BrokerContext
+ {
+ public:
+ QueueContext(MockPersistableQueue* q,
+ const qpid::asyncStore::AsyncOperation::opCode op);
+ virtual ~QueueContext();
+ const char* getOp() const;
+ void destroy();
+ MockPersistableQueue* m_q;
+ const qpid::asyncStore::AsyncOperation::opCode m_op;
+ };
+
+ MockPersistableQueue(const std::string& name,
+ const qpid::framing::FieldTable& args,
+ AsyncStoreImplPtr store,
+ const TestOptions& perfTestParams,
+ const char* msgData);
+ virtual ~MockPersistableQueue();
+
+ // --- Async functionality ---
+ static void handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc);
+ qpid::broker::QueueHandle& getHandle();
+
+ // --- Performance test thread entry points ---
+ void* runEnqueues();
+ void* runDequeues();
+ static void* startEnqueues(void* ptr);
+ static void* startDequeues(void* ptr);
+
+ // --- Interface qpid::broker::Persistable ---
+ virtual void encode(qpid::framing::Buffer& buffer) const;
+ virtual uint32_t encodedSize() const;
+ virtual uint64_t getPersistenceId() const;
+ virtual void setPersistenceId(uint64_t persistenceId) const;
+
+ // --- Interface qpid::broker::PersistableQueue ---
+ virtual void flush();
+ virtual const std::string& getName() const;
+ virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst);
+
+ // --- Interface DataStore ---
+ virtual uint64_t getSize();
+ virtual void write(char* target);
+
+protected:
+ const std::string m_name;
+ AsyncStoreImplPtr m_store;
+ mutable uint64_t m_persistenceId;
+ std::string m_persistableData;
+ qpid::broker::QueueHandle m_queueHandle;
+
+ // Test params
+ const TestOptions& m_perfTestOpts;
+ const char* m_msgData;
+
+ typedef std::deque<QueuedMessagePtr> MsgEnqList;
+ typedef MsgEnqList::iterator MsgEnqListItr;
+ MsgEnqList m_enqueuedMsgs;
+ qpid::sys::Mutex m_enqueuedMsgsMutex;
+ qpid::sys::Condition m_dequeueCondition;
+
+ // --- Ascnc op completions (called through handleAsyncResult) ---
+ void createComplete(const QueueContext* qc);
+ void flushComplete(const QueueContext* qc);
+ void destroyComplete(const QueueContext* qc);
+
+ // --- Queue functionality ---
+ void push(QueuedMessagePtr& msg);
+ void pop(QueuedMessagePtr& msg);
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerfTools_asyncPerf_MockPersistableQueue_h_
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp
new file mode 100644
index 0000000000..e564876daa
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockTransactionContext.cpp
+ */
+
+#include "MockTransactionContext.h"
+
+#include "QueuedMessage.h"
+
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+// --- Inner class MockTransactionContext::QueueContext ---
+
+MockTransactionContext::TransactionContext::TransactionContext(MockTransactionContext* tc,
+ const qpid::asyncStore::AsyncOperation::opCode op) :
+ m_tc(tc),
+ m_op(op)
+{}
+
+MockTransactionContext::TransactionContext::~TransactionContext()
+{}
+
+const char*
+MockTransactionContext::TransactionContext::getOp() const
+{
+ return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
+}
+
+void
+MockTransactionContext::TransactionContext::destroy()
+{
+ delete this;
+}
+
+// --- Class MockTransactionContext ---
+
+
+MockTransactionContext::MockTransactionContext(AsyncStoreImplPtr store,
+ const std::string& xid) :
+ m_store(store),
+ m_txnHandle(store->createTxnHandle(xid)),
+ m_prepared(false),
+ m_enqueuedMsgs()
+{
+//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl;
+}
+
+MockTransactionContext::~MockTransactionContext()
+{}
+
+// static
+void
+MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc)
+{
+ if (bc && res) {
+ TransactionContext* tc = dynamic_cast<TransactionContext*>(bc);
+ if (tc->m_tc) {
+ if (res->errNo) {
+ // TODO: Handle async failure here
+ std::cerr << "Transaction xid=\"" << tc->m_tc->getXid() << "\": Operation " << tc->getOp() << ": failure "
+ << res->errNo << " (" << res->errMsg << ")" << std::endl;
+ } else {
+ // Handle async success here
+ switch(tc->m_op) {
+ case qpid::asyncStore::AsyncOperation::TXN_PREPARE:
+ tc->m_tc->prepareComplete(tc);
+ break;
+ case qpid::asyncStore::AsyncOperation::TXN_COMMIT:
+ tc->m_tc->commitComplete(tc);
+ break;
+ case qpid::asyncStore::AsyncOperation::TXN_ABORT:
+ tc->m_tc->abortComplete(tc);
+ break;
+ default:
+ std::ostringstream oss;
+ oss << "tests::storePerftools::asyncPerf::MockTransactionContext::handleAsyncResult(): Unknown async operation: " << tc->m_op;
+ throw qpid::Exception(oss.str());
+ };
+ }
+ }
+ }
+ if (bc) delete bc;
+ if (res) delete res;
+}
+
+qpid::broker::TxnHandle&
+MockTransactionContext::getHandle()
+{
+ return m_txnHandle;
+}
+
+bool
+MockTransactionContext::is2pc() const
+{
+ return m_txnHandle.is2pc();
+}
+
+const std::string&
+MockTransactionContext::getXid() const
+{
+ return m_txnHandle.getXid();
+}
+
+void
+MockTransactionContext::addEnqueuedMsg(QueuedMessage* qm)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+ m_enqueuedMsgs.push_back(qm);
+}
+
+void
+MockTransactionContext::prepare()
+{
+ if (m_txnHandle.is2pc()) {
+ localPrepare();
+ m_prepared = true;
+ }
+ std::ostringstream oss;
+ oss << "MockTransactionContext::prepare(): xid=\"" << getXid()
+ << "\": Transaction Error: called prepare() on local transaction";
+ throw qpid::Exception(oss.str());
+}
+
+void
+MockTransactionContext::abort()
+{
+ // TODO: Check the following XA transaction semantics:
+ // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared.
+ if (!m_prepared) {
+ localPrepare();
+ }
+ m_store->submitAbort(m_txnHandle,
+ &handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT)));
+//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
+}
+
+void
+MockTransactionContext::commit()
+{
+ if (is2pc()) {
+ if (!m_prepared) {
+ std::ostringstream oss;
+ oss << "MockTransactionContext::abort(): xid=\"" << getXid()
+ << "\": Transaction Error: called commit() without prepare() on 2PC transaction";
+ throw qpid::Exception(oss.str());
+ }
+ } else {
+ localPrepare();
+ }
+ m_store->submitCommit(m_txnHandle,
+ &handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT)));
+//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
+}
+
+
+// protected
+void
+MockTransactionContext::localPrepare()
+{
+ m_store->submitPrepare(m_txnHandle,
+ &handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE)));
+//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
+}
+
+// protected
+void
+MockTransactionContext::prepareComplete(const TransactionContext* /*tc*/)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+ while (!m_enqueuedMsgs.empty()) {
+ m_enqueuedMsgs.front()->clearTransaction();
+ m_enqueuedMsgs.pop_front();
+ }
+//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": prepareComplete()" << std::endl;
+}
+
+
+// protected
+void
+MockTransactionContext::abortComplete(const TransactionContext* /*tc*/)
+{
+//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": abortComplete()" << std::endl;
+}
+
+
+// protected
+void
+MockTransactionContext::commitComplete(const TransactionContext* /*tc*/)
+{
+//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": commitComplete()" << std::endl;
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h
new file mode 100644
index 0000000000..35de7374c5
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockTransactionContext.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_MockTransactionContext_h_
+#define tests_storePerfTools_asyncPerf_MockTransactionContext_h_
+
+#include "qpid/asyncStore/AsyncOperation.h"
+
+#include "qpid/broker/BrokerContext.h"
+#include "qpid/broker/TransactionalStore.h" // qpid::broker::TransactionContext
+#include "qpid/broker/TxnHandle.h"
+#include "qpid/sys/Mutex.h"
+
+#include <boost/shared_ptr.hpp>
+#include <deque>
+
+namespace qpid {
+namespace asyncStore {
+class AsyncStoreImpl;
+}}
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class QueuedMessage;
+
+typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
+
+class MockTransactionContext : public qpid::broker::TransactionContext
+{
+public:
+ // NOTE: TransactionContext - Bad naming? This context is the async return handling context for class
+ // MockTransactionContext async ops. Other classes using this pattern simply use XXXContext for this class
+ // (e.g. QueueContext in MockPersistableQueue), but in this case it may be confusing.
+ class TransactionContext : public qpid::broker::BrokerContext
+ {
+ public:
+ TransactionContext(MockTransactionContext* tc,
+ const qpid::asyncStore::AsyncOperation::opCode op);
+ virtual ~TransactionContext();
+ const char* getOp() const;
+ void destroy();
+ MockTransactionContext* m_tc;
+ const qpid::asyncStore::AsyncOperation::opCode m_op;
+ };
+
+ MockTransactionContext(AsyncStoreImplPtr store,
+ const std::string& xid = std::string());
+ virtual ~MockTransactionContext();
+ static void handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc);
+
+ qpid::broker::TxnHandle& getHandle();
+ bool is2pc() const;
+ const std::string& getXid() const;
+ void addEnqueuedMsg(QueuedMessage* qm);
+
+ void prepare();
+ void abort();
+ void commit();
+
+protected:
+ AsyncStoreImplPtr m_store;
+ qpid::broker::TxnHandle m_txnHandle;
+ bool m_prepared;
+ std::deque<QueuedMessage*> m_enqueuedMsgs;
+ qpid::sys::Mutex m_enqueuedMsgsMutex;
+
+ void localPrepare();
+
+ // --- Ascnc op completions (called through handleAsyncResult) ---
+ void prepareComplete(const TransactionContext* tc);
+ void abortComplete(const TransactionContext* tc);
+ void commitComplete(const TransactionContext* tc);
+
+};
+
+}}} // namespace tests:storePerftools::asyncPerf
+
+#endif // tests_storePerfTools_asyncPerf_MockTransactionContext_h_
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp
new file mode 100644
index 0000000000..fe66604774
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file PerfTest.cpp
+ */
+
+#include "PerfTest.h"
+
+#include "MockPersistableQueue.h"
+
+#include "tests/storePerfTools/version.h"
+#include "tests/storePerfTools/common/ScopedTimer.h"
+#include "tests/storePerfTools/common/Thread.h"
+
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+
+#include <iomanip>
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+PerfTest::PerfTest(const TestOptions& to,
+ const qpid::asyncStore::AsyncStoreOptions& aso) :
+ m_testOpts(to),
+ m_storeOpts(aso),
+ m_testResult(to),
+ m_msgData(new char[to.m_msgSize]),
+ m_poller(new qpid::sys::Poller),
+ m_pollingThread(m_poller.get())
+{
+ std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize);
+}
+
+PerfTest::~PerfTest()
+{
+ m_poller->shutdown();
+ m_pollingThread.join();
+ delete[] m_msgData;
+}
+
+AsyncStoreImplPtr
+PerfTest::prepareStore()
+{
+ AsyncStoreImplPtr store(new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts));
+ store->initialize();
+ return store;
+}
+
+void
+PerfTest::prepareQueues(std::deque<MockPersistableQueuePtr>& jrnlList,
+ AsyncStoreImplPtr store)
+{
+ for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) {
+ std::ostringstream qname;
+ qname << "queue_" << std::setw(4) << std::setfill('0') << i;
+ MockPersistableQueuePtr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, store, m_testOpts, m_msgData));
+ jrnlList.push_back(mpq);
+ }
+}
+
+void
+PerfTest::destroyQueues(std::deque<MockPersistableQueuePtr>& jrnlList)
+{
+ jrnlList.clear();
+}
+
+void
+PerfTest::run()
+{
+ typedef boost::shared_ptr<tests::storePerftools::common::Thread> ThreadPtr; // TODO - replace with qpid threads
+
+ AsyncStoreImplPtr store = prepareStore();
+
+ std::deque<MockPersistableQueuePtr> queueList;
+ prepareQueues(queueList, store);
+
+ std::deque<ThreadPtr> threads;
+ { // --- Start of timed section ---
+ tests::storePerftools::common::ScopedTimer st(m_testResult);
+
+ for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) {
+ for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads
+ ThreadPtr tp(new tests::storePerftools::common::Thread(queueList[q]->startEnqueues,
+ reinterpret_cast<void*>(queueList[q].get())));
+ threads.push_back(tp);
+ }
+ for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads
+ ThreadPtr tp(new tests::storePerftools::common::Thread(queueList[q]->startDequeues,
+ reinterpret_cast<void*>(queueList[q].get())));
+ threads.push_back(tp);
+ }
+ }
+ while (threads.size()) {
+ threads.front()->join();
+ threads.pop_front();
+ }
+ } // --- End of timed section ---
+
+ destroyQueues(queueList);
+// DEBUG MEASURE - REMOVE WHEN FIXED
+//::sleep(2);
+}
+
+void
+PerfTest::toStream(std::ostream& os) const
+{
+ m_testOpts.printVals(os);
+ os << std::endl;
+ m_storeOpts.printVals(os);
+ os << std::endl;
+ os << m_testResult << std::endl;
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+// -----------------------------------------------------------------
+
+int
+main(int argc, char** argv)
+{
+ qpid::CommonOptions co;
+ qpid::asyncStore::AsyncStoreOptions aso;
+ tests::storePerftools::asyncPerf::TestOptions to;
+ qpid::Options opts;
+ opts.add(co).add(aso).add(to);
+ try {
+ opts.parse(argc, argv);
+ aso.validate();
+ to.validate();
+ }
+ catch (std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ return 1;
+ }
+
+ // Handle options that just print information then exit.
+ if (co.version) {
+ std::cout << tests::storePerftools::name() << " v." << tests::storePerftools::version() << std::endl;
+ return 0;
+ }
+ if (co.help) {
+ std::cout << tests::storePerftools::name() << ": asyncPerf" << std::endl;
+ std::cout << "Performance test for the async store through the qpid async store interface." << std::endl;
+ std::cout << "Usage: asyncPerf [options]" << std::endl;
+ std::cout << opts << std::endl;
+ return 0;
+ }
+
+ // Create and start test
+ tests::storePerftools::asyncPerf::PerfTest apt(to, aso);
+ apt.run();
+
+ // Print test result
+ std::cout << apt << std::endl;
+ ::sleep(1);
+ return 0;
+}
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h
new file mode 100644
index 0000000000..1eb11a51fa
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file PerfTest.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_PerfTest_h_
+#define tests_storePerfTools_asyncPerf_PerfTest_h_
+
+#include "TestResult.h"
+
+#include "tests/storePerfTools/common/Streamable.h"
+
+#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Thread.h"
+
+#include <boost/shared_ptr.hpp>
+#include <deque>
+
+namespace qpid {
+namespace asyncStore {
+class AsyncStoreImpl;
+class AsyncStoreOptions;
+}}
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class MockPersistableQueue;
+class TestOptions;
+
+typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
+typedef boost::shared_ptr<MockPersistableQueue> MockPersistableQueuePtr;
+
+class PerfTest : public tests::storePerftools::common::Streamable
+{
+public:
+ PerfTest(const TestOptions& to,
+ const qpid::asyncStore::AsyncStoreOptions& aso);
+ virtual ~PerfTest();
+ void run();
+ void toStream(std::ostream& os = std::cout) const;
+
+protected:
+ const TestOptions& m_testOpts;
+ const qpid::asyncStore::AsyncStoreOptions& m_storeOpts;
+ TestResult m_testResult;
+ qpid::framing::FieldTable m_queueArgs;
+ const char* m_msgData;
+ boost::shared_ptr<qpid::sys::Poller> m_poller;
+ qpid::sys::Thread m_pollingThread;
+
+ AsyncStoreImplPtr prepareStore();
+ void prepareQueues(std::deque<MockPersistableQueuePtr>& jrnlList,
+ AsyncStoreImplPtr store);
+ void destroyQueues(std::deque<MockPersistableQueuePtr>& jrnlList);
+
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerfTools_asyncPerf_PerfTest_h_
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp
new file mode 100644
index 0000000000..315e202d8b
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file QueuedMessage.cpp
+ */
+
+#include "QueuedMessage.h"
+
+#include "MockTransactionContext.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+QueuedMessage::QueuedMessage(MockPersistableMessagePtr msg,
+ qpid::broker::EnqueueHandle& enqHandle,
+ MockTransactionContextPtr txn) :
+ m_msg(msg),
+ m_enqHandle(enqHandle),
+ m_txn(txn)
+{
+ if (txn) {
+ txn->addEnqueuedMsg(this);
+ }
+}
+
+QueuedMessage::~QueuedMessage()
+{}
+
+MockPersistableMessagePtr
+QueuedMessage::getMessage() const
+{
+ return m_msg;
+}
+
+qpid::broker::EnqueueHandle
+QueuedMessage::getEnqueueHandle() const
+{
+ return m_enqHandle;
+}
+
+MockTransactionContextPtr
+QueuedMessage::getTransactionContext() const
+{
+ return m_txn;
+}
+
+bool
+QueuedMessage::isTransactional() const
+{
+ return m_txn.get() != 0;
+}
+
+void
+QueuedMessage::clearTransaction()
+{
+ m_txn.reset(static_cast<MockTransactionContext*>(0));
+}
+
+}}} // namespace tests::storePerfTools
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h
new file mode 100644
index 0000000000..4b3dab67f9
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file QueuedMessage.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_QueuedMessage_h_
+#define tests_storePerfTools_asyncPerf_QueuedMessage_h_
+
+#include "qpid/broker/EnqueueHandle.h"
+#include <boost/shared_ptr.hpp>
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class MockPersistableMessage;
+class MockTransactionContext;
+
+typedef boost::shared_ptr<MockPersistableMessage> MockPersistableMessagePtr;
+typedef boost::shared_ptr<MockTransactionContext> MockTransactionContextPtr;
+
+class QueuedMessage
+{
+public:
+ QueuedMessage(MockPersistableMessagePtr msg,
+ qpid::broker::EnqueueHandle& enqHandle,
+ MockTransactionContextPtr txn);
+ virtual ~QueuedMessage();
+ MockPersistableMessagePtr getMessage() const;
+ qpid::broker::EnqueueHandle getEnqueueHandle() const;
+ MockTransactionContextPtr getTransactionContext() const;
+ bool isTransactional() const;
+ void clearTransaction();
+
+protected:
+ MockPersistableMessagePtr m_msg;
+ qpid::broker::EnqueueHandle m_enqHandle;
+ MockTransactionContextPtr m_txn;
+};
+
+}}} // namespace tests::storePerfTools
+
+#endif // tests_storePerfTools_asyncPerf_QueuedMessage_h_
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp
new file mode 100644
index 0000000000..27784ef661
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestOptions.cpp
+ */
+
+#include "TestOptions.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+// static declarations
+uint16_t TestOptions::s_defaultEnqTxnBlkSize = 0;
+uint16_t TestOptions::s_defaultDeqTxnBlkSize = 0;
+
+TestOptions::TestOptions(const std::string& name) :
+ tests::storePerftools::common::TestOptions(name),
+ m_enqTxnBlockSize(s_defaultEnqTxnBlkSize),
+ m_deqTxnBlockSize(s_defaultDeqTxnBlkSize)
+{
+ doAddOptions();
+}
+
+TestOptions::TestOptions(const uint32_t numMsgs,
+ const uint32_t msgSize,
+ const uint16_t numQueues,
+ const uint16_t numEnqThreadsPerQueue,
+ const uint16_t numDeqThreadsPerQueue,
+ const uint16_t enqTxnBlockSize,
+ const uint16_t deqTxnBlockSize,
+ const std::string& name) :
+ tests::storePerftools::common::TestOptions(numMsgs, msgSize, numQueues, numEnqThreadsPerQueue, numDeqThreadsPerQueue, name),
+ m_enqTxnBlockSize(enqTxnBlockSize),
+ m_deqTxnBlockSize(deqTxnBlockSize)
+{
+ doAddOptions();
+}
+
+TestOptions::~TestOptions()
+{}
+
+void
+TestOptions::printVals(std::ostream& os) const
+{
+ tests::storePerftools::common::TestOptions::printVals(os);
+ os << " Num enqueus per transaction [-t, --enq-txn-size]: " << m_enqTxnBlockSize << std::endl;
+ os << " Num dequeues per transaction [-d, --deq-txn-size]: " << m_deqTxnBlockSize << std::endl;
+}
+
+void
+TestOptions::doAddOptions()
+{
+ addOptions()
+ ("enq-txn-size,t", qpid::optValue(m_enqTxnBlockSize, "N"),
+ "Num enqueus per transaction (0 = no transactions)")
+ ("deq-txn-size,d", qpid::optValue(m_deqTxnBlockSize, "N"),
+ "Num dequeues per transaction (0 = no transactions)")
+ ;
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h
new file mode 100644
index 0000000000..b0e1e4ce74
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestOptions.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_TestOptions_h_
+#define tests_storePerfTools_asyncPerf_TestOptions_h_
+
+#include "tests/storePerfTools/common/TestOptions.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class TestOptions : public tests::storePerftools::common::TestOptions
+{
+public:
+ TestOptions(const std::string& name="Test Options");
+ TestOptions(const uint32_t numMsgs,
+ const uint32_t msgSize,
+ const uint16_t numQueues,
+ const uint16_t numEnqThreadsPerQueue,
+ const uint16_t numDeqThreadsPerQueue,
+ const uint16_t enqTxnBlockSize,
+ const uint16_t deqTxnBlockSize,
+ const std::string& name="Test Options");
+ virtual ~TestOptions();
+ void printVals(std::ostream& os) const;
+
+ uint16_t m_enqTxnBlockSize; ///< Transaction block size for enqueues
+ uint16_t m_deqTxnBlockSize; ///< Transaction block size for dequeues
+
+protected:
+ static uint16_t s_defaultEnqTxnBlkSize; ///< Default transaction block size for enqueues
+ static uint16_t s_defaultDeqTxnBlkSize; ///< Default transaction block size for dequeues
+
+ void doAddOptions();
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerfTools_asyncPerf_TestOptions_h_
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp
new file mode 100644
index 0000000000..cf6f293494
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestResult.cpp
+ */
+
+#include "TestResult.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+TestResult::TestResult(const TestOptions& to) :
+ tests::storePerftools::common::TestResult(),
+ m_testOpts(to)
+{}
+
+TestResult::~TestResult()
+{}
+
+void
+TestResult::toStream(std::ostream& os) const
+{
+ double msgsRate;
+ os << "TEST RESULTS:" << std::endl;
+ os << " Msgs per thread: " << m_testOpts.m_numMsgs << std::endl;
+ os << " Msg size: " << m_testOpts.m_msgSize << std::endl;
+ os << " No. queues: " << m_testOpts.m_numQueues << std::endl;
+ os << " No. enq threads/queue: " << m_testOpts.m_numEnqThreadsPerQueue << std::endl;
+ os << " No. deq threads/queue: " << m_testOpts.m_numDeqThreadsPerQueue << std::endl;
+ os << " Time taken: " << m_elapsed << " sec" << std::endl;
+ uint32_t msgsPerQueue = m_testOpts.m_numMsgs * m_testOpts.m_numEnqThreadsPerQueue;
+ if (m_testOpts.m_numQueues > 1) {
+ msgsRate = double(msgsPerQueue) / m_elapsed;
+ os << " No. msgs per queue: " << msgsPerQueue << std::endl;
+ os << "Per queue msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl;
+ os << " " << (msgsRate * m_testOpts.m_msgSize / 1e6) << " MB/sec" << std::endl;
+ }
+ uint32_t totalMsgs = msgsPerQueue * m_testOpts.m_numQueues;
+ msgsRate = double(totalMsgs) / m_elapsed;
+ os << " Total no. msgs: " << totalMsgs << std::endl;
+ os << " Broker msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl;
+ os << " " << (msgsRate * m_testOpts.m_msgSize / 1e6) << " MB/sec" << std::endl;
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h
new file mode 100644
index 0000000000..1310689ff8
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestResult.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_TestResult_h_
+#define tests_storePerfTools_asyncPerf_TestResult_h_
+
+#include "TestOptions.h"
+
+#include "tests/storePerfTools/common/TestResult.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class TestOptions;
+
+/**
+ * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in the journal.
+ *
+ * This class (being subclassed from ScopedTimable) is passed to a ScopedTimer object on construction, and the
+ * inherited _elapsed member will be written with the calculated elapsed time (in seconds) on destruction of the
+ * ScopedTimer object. This time (initially set to 0.0) is used to calculate message and message byte throughput.
+ * The message number and size information comes from the JrnlPerfTestParameters object passed to the constructor.
+ *
+ * Results are available through the use of toStream(), toString() or the << operators.
+ *
+ * Output is in the following format:
+ * <pre>
+ * TEST RESULTS:
+ * Msgs per thread: 10000
+ * Msg size: 2048
+ * No. queues: 2
+ * No. threads/queue: 2
+ * Time taken: 1.6626 sec
+ * Total no. msgs: 40000
+ * Msg throughput: 24.0587 kMsgs/sec
+ * 49.2723 MB/sec
+ * </pre>
+ */
+class TestResult : public tests::storePerftools::common::TestResult
+{
+public:
+ /**
+ * \brief Constructor
+ *
+ * Constructor. Will start the time interval measurement.
+ *
+ * \param tp Test parameter details used to calculate the performance results.
+ */
+ TestResult(const TestOptions& to);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~TestResult();
+
+ /**
+ * \brief Stream the performance test results to an output stream
+ *
+ * Convenience feature which streams a multi-line performance result an output stream.
+ *
+ * \param os Output stream to which the results are to be streamed
+ */
+ void toStream(std::ostream& os = std::cout) const;
+
+protected:
+ TestOptions m_testOpts; ///< Test parameters used for performance calculations
+
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerfTools_asyncPerf_TestResult_h_