summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-05-09 14:19:28 +0000
committerKim van der Riet <kpvdr@apache.org>2012-05-09 14:19:28 +0000
commit2e3690efe94e6161d129ebce2f8c22cb25819ec1 (patch)
tree8fc2b2aa701127d7ee827319b809aeaf543f2a59 /cpp/src/tests
parent633c33f224f3196f3f9bd80bd2e418d8143fea06 (diff)
downloadqpid-python-2e3690efe94e6161d129ebce2f8c22cb25819ec1.tar.gz
QPID-3858: Initial checkin of async interface implementation and test harness.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1336220 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/CMakeLists.txt71
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp178
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h105
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp345
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h133
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp219
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h101
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp175
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h81
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp77
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h61
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp79
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h60
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp63
-rw-r--r--cpp/src/tests/storePerfTools/asyncPerf/TestResult.h93
-rw-r--r--cpp/src/tests/storePerfTools/common/Parameters.cpp33
-rw-r--r--cpp/src/tests/storePerfTools/common/Parameters.h43
-rw-r--r--cpp/src/tests/storePerfTools/common/PerftoolError.cpp202
-rw-r--r--cpp/src/tests/storePerfTools/common/PerftoolError.h127
-rw-r--r--cpp/src/tests/storePerfTools/common/ScopedTimable.cpp43
-rw-r--r--cpp/src/tests/storePerfTools/common/ScopedTimable.h56
-rw-r--r--cpp/src/tests/storePerfTools/common/ScopedTimer.cpp58
-rw-r--r--cpp/src/tests/storePerfTools/common/ScopedTimer.h91
-rw-r--r--cpp/src/tests/storePerfTools/common/Streamable.cpp54
-rw-r--r--cpp/src/tests/storePerfTools/common/Streamable.h79
-rw-r--r--cpp/src/tests/storePerfTools/common/TestOptions.cpp103
-rw-r--r--cpp/src/tests/storePerfTools/common/TestOptions.h66
-rw-r--r--cpp/src/tests/storePerfTools/common/TestParameters.cpp135
-rw-r--r--cpp/src/tests/storePerfTools/common/TestParameters.h115
-rw-r--r--cpp/src/tests/storePerfTools/common/TestResult.cpp38
-rw-r--r--cpp/src/tests/storePerfTools/common/TestResult.h44
-rw-r--r--cpp/src/tests/storePerfTools/common/Thread.cpp67
-rw-r--r--cpp/src/tests/storePerfTools/common/Thread.h84
-rw-r--r--cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp218
-rw-r--r--cpp/src/tests/storePerfTools/jrnlPerf/Journal.h169
-rw-r--r--cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.cpp166
-rw-r--r--cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.h131
-rw-r--r--cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.cpp241
-rw-r--r--cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.h135
-rw-r--r--cpp/src/tests/storePerfTools/jrnlPerf/TestResult.cpp65
-rw-r--r--cpp/src/tests/storePerfTools/jrnlPerf/TestResult.h92
-rw-r--r--cpp/src/tests/storePerfTools/version.h49
42 files changed, 4545 insertions, 0 deletions
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index 5979ce42ae..965e4b2b38 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -343,3 +343,74 @@ add_library (dlclose_noop MODULE dlclose_noop.c)
#EXTRA_DIST+=$(LONG_TESTS) run_perftest
#check-long:
# $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND=
+
+
+# Async Store perf tests
+# ----------------------
+
+# New journal perf test (jrnl2Perf)
+set (jrnl2Perf_SOURCES
+ storePerfTools/jrnlPerf/Journal.cpp
+ storePerfTools/jrnlPerf/JournalParameters.cpp
+ storePerfTools/jrnlPerf/PerfTest.cpp
+ storePerfTools/jrnlPerf/TestResult.cpp
+
+ storePerfTools/common/Parameters.cpp
+ storePerfTools/common/PerftoolError.cpp
+ storePerfTools/common/ScopedTimable.cpp
+ storePerfTools/common/ScopedTimer.cpp
+ storePerfTools/common/Streamable.cpp
+ storePerfTools/common/TestParameters.cpp
+ storePerfTools/common/TestResult.cpp
+ storePerfTools/common/Thread.cpp
+)
+
+if (UNIX)
+ add_executable (jrnl2Perf ${jrnl2Perf_SOURCES})
+ set_target_properties (jrnl2Perf PROPERTIES
+ COMPILE_FLAGS "-DJOURNAL2"
+ LINK_FLAGS "-L${QPID_BUILD_DIR}/src"
+ )
+ target_link_libraries (jrnl2Perf
+ asyncStore
+ qpidbroker
+ rt
+ )
+endif (UNIX)
+
+# Async store perf test (asyncPerf)
+set (asyncPerf_SOURCES
+ storePerfTools/asyncPerf/MockPersistableMessage.cpp
+ storePerfTools/asyncPerf/MockPersistableQueue.cpp
+ storePerfTools/asyncPerf/MockTransactionContext.cpp
+ storePerfTools/asyncPerf/PerfTest.cpp
+ storePerfTools/asyncPerf/QueuedMessage.cpp
+ storePerfTools/asyncPerf/TestOptions.cpp
+ storePerfTools/asyncPerf/TestResult.cpp
+
+ storePerfTools/common/Parameters.cpp
+ storePerfTools/common/PerftoolError.cpp
+ storePerfTools/common/ScopedTimable.cpp
+ storePerfTools/common/ScopedTimer.cpp
+ storePerfTools/common/Streamable.cpp
+ storePerfTools/common/TestOptions.cpp
+ storePerfTools/common/TestParameters.cpp
+ storePerfTools/common/TestResult.cpp
+ storePerfTools/common/Thread.cpp
+)
+
+if (UNIX)
+ add_executable (asyncPerf ${asyncPerf_SOURCES})
+ set_target_properties (asyncPerf PROPERTIES
+ COMPILE_FLAGS "-DJOURNAL2"
+ LINK_FLAGS "-L${QPID_BUILD_DIR}/src"
+ )
+ target_link_libraries (asyncPerf
+ boost_program_options
+ asyncStore
+ qpidbroker
+ qpidcommon
+ qpidtypes
+ rt
+ )
+endif (UNIX)
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp
new file mode 100644
index 0000000000..75fc921494
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockPersistableMessage.cpp
+ */
+
+#include "MockPersistableMessage.h"
+
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+// --- Inner class Queue::MessageContext ---
+
+MockPersistableMessage::MessageContext::MessageContext(MockPersistableMessagePtr msg,
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ MockPersistableQueue* q) :
+ m_msg(msg),
+ m_op(op),
+ m_q(q)
+{}
+
+MockPersistableMessage::MessageContext::~MessageContext()
+{}
+
+const char*
+MockPersistableMessage::MessageContext::getOp() const
+{
+ return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
+}
+
+void
+MockPersistableMessage::MessageContext::destroy()
+{
+ delete this;
+}
+
+// --- Class MockPersistableMessage ---
+
+
+MockPersistableMessage::MockPersistableMessage(const char* msgData,
+ const uint32_t msgSize,
+ AsyncStoreImplPtr store,
+ const bool persistent) :
+ m_persistenceId(0ULL),
+ m_msg(msgData, static_cast<size_t>(msgSize)),
+ m_persistent(persistent),
+ m_msgHandle(store->createMessageHandle(this))
+{}
+
+MockPersistableMessage::~MockPersistableMessage()
+{}
+
+// static
+void
+MockPersistableMessage::handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc)
+{
+ if (bc) {
+ MessageContext* mc = dynamic_cast<MessageContext*>(bc);
+ if (mc->m_msg) {
+ if (res->errNo) {
+ // TODO: Handle async failure here
+ std::cerr << "Message pid=0x" << std::hex << mc->m_msg->m_persistenceId << std::dec << ": Operation "
+ << mc->getOp() << ": failure " << res->errNo << " (" << res->errMsg << ")" << std::endl;
+ } else {
+ // Handle async success here
+ switch(mc->m_op) {
+ case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE:
+ mc->m_msg->dequeueComplete(mc);
+ break;
+ case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE:
+ mc->m_msg->enqueueComplete(mc);
+ break;
+ default:
+ std::ostringstream oss;
+ oss << "tests::storePerftools::asyncPerf::MockPersistableMessage::handleAsyncResult(): Unknown async queue operation: " << mc->m_op;
+ throw qpid::Exception(oss.str());
+ };
+ }
+ }
+ }
+ if (bc) delete bc;
+ if (res) delete res;
+}
+
+qpid::broker::MessageHandle&
+MockPersistableMessage::getHandle()
+{
+ return m_msgHandle;
+}
+
+void
+MockPersistableMessage::setPersistenceId(uint64_t id) const
+{
+ m_persistenceId = id;
+}
+
+uint64_t
+MockPersistableMessage::getPersistenceId() const
+{
+ return m_persistenceId;
+}
+
+void
+MockPersistableMessage::encode(qpid::framing::Buffer& buffer) const
+{
+ buffer.putRawData(m_msg);
+}
+
+uint32_t
+MockPersistableMessage::encodedSize() const
+{
+ return static_cast<uint32_t>(m_msg.size());
+}
+
+void
+MockPersistableMessage::allDequeuesComplete()
+{}
+
+uint32_t
+MockPersistableMessage::encodedHeaderSize() const
+{
+ return 0;
+}
+
+bool
+MockPersistableMessage::isPersistent() const
+{
+ return m_persistent;
+}
+
+uint64_t
+MockPersistableMessage::getSize()
+{
+ return m_msg.size();
+}
+
+void
+MockPersistableMessage::write(char* target)
+{
+ ::memcpy(target, m_msg.data(), m_msg.size());
+}
+
+// protected
+void
+MockPersistableMessage::enqueueComplete(const MessageContext* /*mc*/)
+{
+//std::cout << "~~~~~ Message pid=0x" << std::hex << m_persistenceId << std::dec << ": enqueueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl;
+}
+
+// protected
+void
+MockPersistableMessage::dequeueComplete(const MessageContext* /*mc*/)
+{
+//std::cout << "~~~~~ Message pid=0x" << std::hex << m_persistenceId << std::dec << ": dequeueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl;
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h
new file mode 100644
index 0000000000..a139328690
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockPersistableMessage.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_MockPersistableMessage_h_
+#define tests_storePerfTools_asyncPerf_MockPersistableMessage_h_
+
+#include "qpid/asyncStore/AsyncOperation.h"
+#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource
+#include "qpid/broker/BrokerContext.h"
+#include "qpid/broker/MessageHandle.h"
+#include "qpid/broker/PersistableMessage.h"
+
+#include <stdint.h> // uint32_t
+
+namespace qpid {
+namespace asyncStore {
+class AsyncStoreImpl;
+}}
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class MockPersistableMessage;
+class MockPersistableQueue;
+
+typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
+typedef boost::shared_ptr<MockPersistableMessage> MockPersistableMessagePtr;
+
+class MockPersistableMessage: public qpid::broker::PersistableMessage, qpid::broker::DataSource
+{
+public:
+ class MessageContext : public qpid::broker::BrokerContext
+ {
+ public:
+ MessageContext(MockPersistableMessagePtr msg,
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ MockPersistableQueue* q);
+ virtual ~MessageContext();
+ const char* getOp() const;
+ void destroy();
+ MockPersistableMessagePtr m_msg;
+ const qpid::asyncStore::AsyncOperation::opCode m_op;
+ MockPersistableQueue* m_q;
+ };
+
+ MockPersistableMessage(const char* msgData,
+ const uint32_t msgSize,
+ AsyncStoreImplPtr store,
+ const bool persistent = true);
+ virtual ~MockPersistableMessage();
+ static void handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc);
+ qpid::broker::MessageHandle& getHandle();
+
+ // Interface Persistable
+ virtual void setPersistenceId(uint64_t id) const;
+ virtual uint64_t getPersistenceId() const;
+ virtual void encode(qpid::framing::Buffer& buffer) const;
+ virtual uint32_t encodedSize() const;
+
+ // Interface PersistableMessage
+ virtual void allDequeuesComplete();
+ virtual uint32_t encodedHeaderSize() const;
+ virtual bool isPersistent() const;
+
+ // Interface DataStore
+ virtual uint64_t getSize();
+ virtual void write(char* target);
+
+protected:
+ mutable uint64_t m_persistenceId;
+ const std::string m_msg;
+ const bool m_persistent;
+ qpid::broker::MessageHandle m_msgHandle;
+
+ // --- Ascnc op completions (called through handleAsyncResult) ---
+ void enqueueComplete(const MessageContext* mc);
+ void dequeueComplete(const MessageContext* mc);
+
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerfTools_asyncPerf_MockPersistableMessage_h_
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
new file mode 100644
index 0000000000..ff9b76c421
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockPersistableQueue.cpp
+ */
+
+#include "MockPersistableQueue.h"
+
+#include "MockPersistableMessage.h"
+#include "MockTransactionContext.h"
+#include "QueuedMessage.h"
+#include "TestOptions.h"
+
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/broker/EnqueueHandle.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+// --- Inner class MockPersistableQueue::QueueContext ---
+
+MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueue* q,
+ const qpid::asyncStore::AsyncOperation::opCode op) :
+ m_q(q),
+ m_op(op)
+{}
+
+MockPersistableQueue::QueueContext::~QueueContext()
+{}
+
+const char*
+MockPersistableQueue::QueueContext::getOp() const
+{
+ return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
+}
+
+void
+MockPersistableQueue::QueueContext::destroy()
+{
+ delete this;
+}
+
+// --- Class MockPersistableQueue ---
+
+MockPersistableQueue::MockPersistableQueue(const std::string& name,
+ const qpid::framing::FieldTable& /*args*/,
+ AsyncStoreImplPtr store,
+ const TestOptions& to,
+ const char* msgData) :
+ m_name(name),
+ m_store(store),
+ m_persistenceId(0ULL),
+ m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this.
+ m_perfTestOpts(to),
+ m_msgData(msgData)
+{
+ const qpid::types::Variant::Map qo;
+ m_queueHandle = m_store->createQueueHandle(m_name, qo);
+ qpid::broker::BrokerContext* bc = new QueueContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE);
+ m_store->submitCreate(m_queueHandle,
+ dynamic_cast<const qpid::broker::DataSource*>(this),
+ &handleAsyncResult,
+ bc);
+}
+
+MockPersistableQueue::~MockPersistableQueue()
+{
+// m_store->flush(*this);
+ // TODO: Make destroying the store a test parameter
+// m_store->destroy(*this);
+// m_store = 0;
+}
+
+// static
+void
+MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc)
+{
+ if (bc && res) {
+ QueueContext* qc = dynamic_cast<QueueContext*>(bc);
+ if (qc->m_q) {
+ if (res->errNo) {
+ // TODO: Handle async failure here
+ std::cerr << "Queue name=\"" << qc->m_q->m_name << "\": Operation " << qc->getOp() << ": failure "
+ << res->errNo << " (" << res->errMsg << ")" << std::endl;
+ } else {
+ // Handle async success here
+ switch(qc->m_op) {
+ case qpid::asyncStore::AsyncOperation::QUEUE_CREATE:
+ qc->m_q->createComplete(qc);
+ break;
+ case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH:
+ qc->m_q->flushComplete(qc);
+ break;
+ case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY:
+ qc->m_q->destroyComplete(qc);
+ break;
+ default:
+ std::ostringstream oss;
+ oss << "tests::storePerftools::asyncPerf::MockPersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->m_op;
+ throw qpid::Exception(oss.str());
+ };
+ }
+ }
+ }
+ if (bc) delete bc;
+ if (res) delete res;
+}
+
+qpid::broker::QueueHandle&
+MockPersistableQueue::getHandle()
+{
+ return m_queueHandle;
+}
+
+void*
+MockPersistableQueue::runEnqueues()
+{
+ uint32_t numMsgs = 0;
+ uint16_t txnCnt = 0;
+ const bool useTxn = m_perfTestOpts.m_enqTxnBlockSize > 0;
+ MockTransactionContextPtr txn;
+ while (numMsgs < m_perfTestOpts.m_numMsgs) {
+ if (useTxn && txnCnt == 0) {
+ txn.reset(new MockTransactionContext(m_store)); // equivalent to begin()
+ }
+ MockPersistableMessagePtr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true));
+ msg->setPersistenceId(m_store->getNextRid());
+ qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle);
+ MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, this);
+ if (useTxn) {
+ m_store->submitEnqueue(enqHandle,
+ txn->getHandle(),
+ &MockPersistableMessage::handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(msgCtxt));
+ } else {
+ m_store->submitEnqueue(enqHandle,
+ &MockPersistableMessage::handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(msgCtxt));
+ }
+ QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn));
+ push(qm);
+//std::cout << "**** push 0x" << std::hex << msg->getPersistenceId() << std::dec << std::endl;
+ if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) {
+ txn->commit();
+ txnCnt = 0;
+ }
+ ++numMsgs;
+ }
+ if (txnCnt > 0) {
+ txn->commit();
+ txnCnt = 0;
+ }
+ return 0;
+}
+
+void*
+MockPersistableQueue::runDequeues()
+{
+ uint32_t numMsgs = 0;
+ const uint32_t numMsgsToDequeue = m_perfTestOpts.m_numMsgs * m_perfTestOpts.m_numEnqThreadsPerQueue / m_perfTestOpts.m_numDeqThreadsPerQueue;
+ uint16_t txnCnt = 0;
+ const bool useTxn = m_perfTestOpts.m_deqTxnBlockSize > 0;
+ MockTransactionContextPtr txn;
+ QueuedMessagePtr qm;
+ while (numMsgs < numMsgsToDequeue) {
+ if (useTxn && txnCnt == 0) {
+ txn.reset(new MockTransactionContext(m_store)); // equivalent to begin()
+ }
+ pop(qm);
+ if (qm.get()) {
+ qpid::broker::EnqueueHandle enqHandle = qm->getEnqueueHandle();
+ qpid::broker::BrokerContext* bc = new MockPersistableMessage::MessageContext(qm->getMessage(),
+ qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
+ this);
+ if (useTxn) {
+ m_store->submitDequeue(enqHandle,
+ txn->getHandle(),
+ &MockPersistableMessage::handleAsyncResult,
+ bc);
+ } else {
+ m_store->submitDequeue(enqHandle,
+ &MockPersistableMessage::handleAsyncResult,
+ bc);
+ }
+ ++numMsgs;
+ qm.reset(static_cast<QueuedMessage*>(0));
+ if (useTxn && ++txnCnt >= m_perfTestOpts.m_deqTxnBlockSize) {
+ txn->commit();
+ txnCnt = 0;
+ }
+ } else {
+// ::usleep(100); // 0.1 ms TODO: Use a condition variable instead of sleeping/spinning
+ }
+ }
+ if (txnCnt > 0) {
+ txn->commit();
+ txnCnt = 0;
+ }
+ return 0;
+}
+
+//static
+void*
+MockPersistableQueue::startEnqueues(void* ptr)
+{
+ return reinterpret_cast<MockPersistableQueue*>(ptr)->runEnqueues();
+}
+
+//static
+void*
+MockPersistableQueue::startDequeues(void* ptr)
+{
+ return reinterpret_cast<MockPersistableQueue*>(ptr)->runDequeues();
+}
+
+void
+MockPersistableQueue::encode(qpid::framing::Buffer& buffer) const
+{
+ buffer.putShortString(m_name);
+}
+
+uint32_t
+MockPersistableQueue::encodedSize() const
+{
+ return m_name.size() + 1;
+}
+
+uint64_t
+MockPersistableQueue::getPersistenceId() const
+{
+ return m_persistenceId;
+}
+
+void
+MockPersistableQueue::setPersistenceId(uint64_t persistenceId) const
+{
+ m_persistenceId = persistenceId;
+}
+
+void
+MockPersistableQueue::flush()
+{
+ //if(m_store) m_store->flush(*this);
+}
+
+const std::string&
+MockPersistableQueue::getName() const
+{
+ return m_name;
+}
+
+void
+MockPersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst)
+{
+ if (externalQueueStore != inst && externalQueueStore)
+ delete externalQueueStore;
+ externalQueueStore = inst;
+}
+
+uint64_t
+MockPersistableQueue::getSize()
+{
+ return m_persistableData.size();
+}
+
+void
+MockPersistableQueue::write(char* target)
+{
+ ::memcpy(target, m_persistableData.data(), m_persistableData.size());
+}
+
+// protected
+void
+MockPersistableQueue::createComplete(const QueueContext* /*qc*/)
+{
+//std::cout << "~~~~~ Queue name=\"" << m_name << "\": createComplete()" << std::endl;
+}
+
+// protected
+void
+MockPersistableQueue::flushComplete(const QueueContext* /*qc*/)
+{
+//std::cout << "~~~~~ Queue name=\"" << m_name << "\": flushComplete()" << std::endl;
+}
+
+// protected
+void
+MockPersistableQueue::destroyComplete(const QueueContext* /*qc*/)
+{
+//std::cout << "~~~~~ Queue name=\"" << m_name << "\": destroyComplete()" << std::endl;
+}
+
+// protected
+void
+MockPersistableQueue::push(QueuedMessagePtr& qm)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+ m_enqueuedMsgs.push_back(qm);
+ m_dequeueCondition.notify();
+}
+
+// protected
+void
+MockPersistableQueue::pop(QueuedMessagePtr& qm)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+ if (m_enqueuedMsgs.empty()) {
+ m_dequeueCondition.wait(m_enqueuedMsgsMutex);
+ }
+ qm = m_enqueuedMsgs.front();
+ if (qm->isTransactional()) {
+ // The next msg is still in an open transaction, skip and find next non-open-txn msg
+ MsgEnqListItr i = m_enqueuedMsgs.begin();
+ while (++i != m_enqueuedMsgs.end()) {
+ if (!(*i)->isTransactional()) {
+ qm = *i;
+ m_enqueuedMsgs.erase(i);
+ }
+ }
+ } else {
+ // The next msg is not in an open txn
+ m_enqueuedMsgs.pop_front();
+ }
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h
new file mode 100644
index 0000000000..a77f41743c
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockPersistableQueue.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_MockPersistableQueue_h_
+#define tests_storePerfTools_asyncPerf_MockPersistableQueue_h_
+
+#include "qpid/asyncStore/AsyncOperation.h"
+#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource
+#include "qpid/broker/BrokerContext.h"
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/QueueHandle.h"
+#include "qpid/sys/Condition.h"
+#include "qpid/sys/Mutex.h"
+
+#include <boost/shared_ptr.hpp>
+#include <deque>
+
+namespace qpid {
+namespace asyncStore {
+class AsyncStoreImpl;
+}
+namespace framing {
+class FieldTable;
+}}
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class QueuedMessage;
+class TestOptions;
+
+typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
+typedef boost::shared_ptr<QueuedMessage> QueuedMessagePtr;
+
+class MockPersistableQueue : public qpid::broker::PersistableQueue, qpid::broker::DataSource
+{
+public:
+ class QueueContext : public qpid::broker::BrokerContext
+ {
+ public:
+ QueueContext(MockPersistableQueue* q,
+ const qpid::asyncStore::AsyncOperation::opCode op);
+ virtual ~QueueContext();
+ const char* getOp() const;
+ void destroy();
+ MockPersistableQueue* m_q;
+ const qpid::asyncStore::AsyncOperation::opCode m_op;
+ };
+
+ MockPersistableQueue(const std::string& name,
+ const qpid::framing::FieldTable& args,
+ AsyncStoreImplPtr store,
+ const TestOptions& perfTestParams,
+ const char* msgData);
+ virtual ~MockPersistableQueue();
+
+ // --- Async functionality ---
+ static void handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc);
+ qpid::broker::QueueHandle& getHandle();
+
+ // --- Performance test thread entry points ---
+ void* runEnqueues();
+ void* runDequeues();
+ static void* startEnqueues(void* ptr);
+ static void* startDequeues(void* ptr);
+
+ // --- Interface qpid::broker::Persistable ---
+ virtual void encode(qpid::framing::Buffer& buffer) const;
+ virtual uint32_t encodedSize() const;
+ virtual uint64_t getPersistenceId() const;
+ virtual void setPersistenceId(uint64_t persistenceId) const;
+
+ // --- Interface qpid::broker::PersistableQueue ---
+ virtual void flush();
+ virtual const std::string& getName() const;
+ virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst);
+
+ // --- Interface DataStore ---
+ virtual uint64_t getSize();
+ virtual void write(char* target);
+
+protected:
+ const std::string m_name;
+ AsyncStoreImplPtr m_store;
+ mutable uint64_t m_persistenceId;
+ std::string m_persistableData;
+ qpid::broker::QueueHandle m_queueHandle;
+
+ // Test params
+ const TestOptions& m_perfTestOpts;
+ const char* m_msgData;
+
+ typedef std::deque<QueuedMessagePtr> MsgEnqList;
+ typedef MsgEnqList::iterator MsgEnqListItr;
+ MsgEnqList m_enqueuedMsgs;
+ qpid::sys::Mutex m_enqueuedMsgsMutex;
+ qpid::sys::Condition m_dequeueCondition;
+
+ // --- Ascnc op completions (called through handleAsyncResult) ---
+ void createComplete(const QueueContext* qc);
+ void flushComplete(const QueueContext* qc);
+ void destroyComplete(const QueueContext* qc);
+
+ // --- Queue functionality ---
+ void push(QueuedMessagePtr& msg);
+ void pop(QueuedMessagePtr& msg);
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerfTools_asyncPerf_MockPersistableQueue_h_
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp
new file mode 100644
index 0000000000..e564876daa
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockTransactionContext.cpp
+ */
+
+#include "MockTransactionContext.h"
+
+#include "QueuedMessage.h"
+
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+// --- Inner class MockTransactionContext::QueueContext ---
+
+MockTransactionContext::TransactionContext::TransactionContext(MockTransactionContext* tc,
+ const qpid::asyncStore::AsyncOperation::opCode op) :
+ m_tc(tc),
+ m_op(op)
+{}
+
+MockTransactionContext::TransactionContext::~TransactionContext()
+{}
+
+const char*
+MockTransactionContext::TransactionContext::getOp() const
+{
+ return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
+}
+
+void
+MockTransactionContext::TransactionContext::destroy()
+{
+ delete this;
+}
+
+// --- Class MockTransactionContext ---
+
+
+MockTransactionContext::MockTransactionContext(AsyncStoreImplPtr store,
+ const std::string& xid) :
+ m_store(store),
+ m_txnHandle(store->createTxnHandle(xid)),
+ m_prepared(false),
+ m_enqueuedMsgs()
+{
+//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl;
+}
+
+MockTransactionContext::~MockTransactionContext()
+{}
+
+// static
+void
+MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc)
+{
+ if (bc && res) {
+ TransactionContext* tc = dynamic_cast<TransactionContext*>(bc);
+ if (tc->m_tc) {
+ if (res->errNo) {
+ // TODO: Handle async failure here
+ std::cerr << "Transaction xid=\"" << tc->m_tc->getXid() << "\": Operation " << tc->getOp() << ": failure "
+ << res->errNo << " (" << res->errMsg << ")" << std::endl;
+ } else {
+ // Handle async success here
+ switch(tc->m_op) {
+ case qpid::asyncStore::AsyncOperation::TXN_PREPARE:
+ tc->m_tc->prepareComplete(tc);
+ break;
+ case qpid::asyncStore::AsyncOperation::TXN_COMMIT:
+ tc->m_tc->commitComplete(tc);
+ break;
+ case qpid::asyncStore::AsyncOperation::TXN_ABORT:
+ tc->m_tc->abortComplete(tc);
+ break;
+ default:
+ std::ostringstream oss;
+ oss << "tests::storePerftools::asyncPerf::MockTransactionContext::handleAsyncResult(): Unknown async operation: " << tc->m_op;
+ throw qpid::Exception(oss.str());
+ };
+ }
+ }
+ }
+ if (bc) delete bc;
+ if (res) delete res;
+}
+
+qpid::broker::TxnHandle&
+MockTransactionContext::getHandle()
+{
+ return m_txnHandle;
+}
+
+bool
+MockTransactionContext::is2pc() const
+{
+ return m_txnHandle.is2pc();
+}
+
+const std::string&
+MockTransactionContext::getXid() const
+{
+ return m_txnHandle.getXid();
+}
+
+void
+MockTransactionContext::addEnqueuedMsg(QueuedMessage* qm)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+ m_enqueuedMsgs.push_back(qm);
+}
+
+void
+MockTransactionContext::prepare()
+{
+ if (m_txnHandle.is2pc()) {
+ localPrepare();
+ m_prepared = true;
+ }
+ std::ostringstream oss;
+ oss << "MockTransactionContext::prepare(): xid=\"" << getXid()
+ << "\": Transaction Error: called prepare() on local transaction";
+ throw qpid::Exception(oss.str());
+}
+
+void
+MockTransactionContext::abort()
+{
+ // TODO: Check the following XA transaction semantics:
+ // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared.
+ if (!m_prepared) {
+ localPrepare();
+ }
+ m_store->submitAbort(m_txnHandle,
+ &handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT)));
+//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
+}
+
+void
+MockTransactionContext::commit()
+{
+ if (is2pc()) {
+ if (!m_prepared) {
+ std::ostringstream oss;
+ oss << "MockTransactionContext::abort(): xid=\"" << getXid()
+ << "\": Transaction Error: called commit() without prepare() on 2PC transaction";
+ throw qpid::Exception(oss.str());
+ }
+ } else {
+ localPrepare();
+ }
+ m_store->submitCommit(m_txnHandle,
+ &handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT)));
+//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
+}
+
+
+// protected
+void
+MockTransactionContext::localPrepare()
+{
+ m_store->submitPrepare(m_txnHandle,
+ &handleAsyncResult,
+ dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE)));
+//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
+}
+
+// protected
+void
+MockTransactionContext::prepareComplete(const TransactionContext* /*tc*/)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
+ while (!m_enqueuedMsgs.empty()) {
+ m_enqueuedMsgs.front()->clearTransaction();
+ m_enqueuedMsgs.pop_front();
+ }
+//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": prepareComplete()" << std::endl;
+}
+
+
+// protected
+void
+MockTransactionContext::abortComplete(const TransactionContext* /*tc*/)
+{
+//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": abortComplete()" << std::endl;
+}
+
+
+// protected
+void
+MockTransactionContext::commitComplete(const TransactionContext* /*tc*/)
+{
+//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": commitComplete()" << std::endl;
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h
new file mode 100644
index 0000000000..35de7374c5
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MockTransactionContext.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_MockTransactionContext_h_
+#define tests_storePerfTools_asyncPerf_MockTransactionContext_h_
+
+#include "qpid/asyncStore/AsyncOperation.h"
+
+#include "qpid/broker/BrokerContext.h"
+#include "qpid/broker/TransactionalStore.h" // qpid::broker::TransactionContext
+#include "qpid/broker/TxnHandle.h"
+#include "qpid/sys/Mutex.h"
+
+#include <boost/shared_ptr.hpp>
+#include <deque>
+
+namespace qpid {
+namespace asyncStore {
+class AsyncStoreImpl;
+}}
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class QueuedMessage;
+
+typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
+
+class MockTransactionContext : public qpid::broker::TransactionContext
+{
+public:
+ // NOTE: TransactionContext - Bad naming? This context is the async return handling context for class
+ // MockTransactionContext async ops. Other classes using this pattern simply use XXXContext for this class
+ // (e.g. QueueContext in MockPersistableQueue), but in this case it may be confusing.
+ class TransactionContext : public qpid::broker::BrokerContext
+ {
+ public:
+ TransactionContext(MockTransactionContext* tc,
+ const qpid::asyncStore::AsyncOperation::opCode op);
+ virtual ~TransactionContext();
+ const char* getOp() const;
+ void destroy();
+ MockTransactionContext* m_tc;
+ const qpid::asyncStore::AsyncOperation::opCode m_op;
+ };
+
+ MockTransactionContext(AsyncStoreImplPtr store,
+ const std::string& xid = std::string());
+ virtual ~MockTransactionContext();
+ static void handleAsyncResult(const qpid::broker::AsyncResult* res,
+ qpid::broker::BrokerContext* bc);
+
+ qpid::broker::TxnHandle& getHandle();
+ bool is2pc() const;
+ const std::string& getXid() const;
+ void addEnqueuedMsg(QueuedMessage* qm);
+
+ void prepare();
+ void abort();
+ void commit();
+
+protected:
+ AsyncStoreImplPtr m_store;
+ qpid::broker::TxnHandle m_txnHandle;
+ bool m_prepared;
+ std::deque<QueuedMessage*> m_enqueuedMsgs;
+ qpid::sys::Mutex m_enqueuedMsgsMutex;
+
+ void localPrepare();
+
+ // --- Ascnc op completions (called through handleAsyncResult) ---
+ void prepareComplete(const TransactionContext* tc);
+ void abortComplete(const TransactionContext* tc);
+ void commitComplete(const TransactionContext* tc);
+
+};
+
+}}} // namespace tests:storePerftools::asyncPerf
+
+#endif // tests_storePerfTools_asyncPerf_MockTransactionContext_h_
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp
new file mode 100644
index 0000000000..fe66604774
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file PerfTest.cpp
+ */
+
+#include "PerfTest.h"
+
+#include "MockPersistableQueue.h"
+
+#include "tests/storePerfTools/version.h"
+#include "tests/storePerfTools/common/ScopedTimer.h"
+#include "tests/storePerfTools/common/Thread.h"
+
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+
+#include <iomanip>
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+PerfTest::PerfTest(const TestOptions& to,
+ const qpid::asyncStore::AsyncStoreOptions& aso) :
+ m_testOpts(to),
+ m_storeOpts(aso),
+ m_testResult(to),
+ m_msgData(new char[to.m_msgSize]),
+ m_poller(new qpid::sys::Poller),
+ m_pollingThread(m_poller.get())
+{
+ std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize);
+}
+
+PerfTest::~PerfTest()
+{
+ m_poller->shutdown();
+ m_pollingThread.join();
+ delete[] m_msgData;
+}
+
+AsyncStoreImplPtr
+PerfTest::prepareStore()
+{
+ AsyncStoreImplPtr store(new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts));
+ store->initialize();
+ return store;
+}
+
+void
+PerfTest::prepareQueues(std::deque<MockPersistableQueuePtr>& jrnlList,
+ AsyncStoreImplPtr store)
+{
+ for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) {
+ std::ostringstream qname;
+ qname << "queue_" << std::setw(4) << std::setfill('0') << i;
+ MockPersistableQueuePtr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, store, m_testOpts, m_msgData));
+ jrnlList.push_back(mpq);
+ }
+}
+
+void
+PerfTest::destroyQueues(std::deque<MockPersistableQueuePtr>& jrnlList)
+{
+ jrnlList.clear();
+}
+
+void
+PerfTest::run()
+{
+ typedef boost::shared_ptr<tests::storePerftools::common::Thread> ThreadPtr; // TODO - replace with qpid threads
+
+ AsyncStoreImplPtr store = prepareStore();
+
+ std::deque<MockPersistableQueuePtr> queueList;
+ prepareQueues(queueList, store);
+
+ std::deque<ThreadPtr> threads;
+ { // --- Start of timed section ---
+ tests::storePerftools::common::ScopedTimer st(m_testResult);
+
+ for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) {
+ for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads
+ ThreadPtr tp(new tests::storePerftools::common::Thread(queueList[q]->startEnqueues,
+ reinterpret_cast<void*>(queueList[q].get())));
+ threads.push_back(tp);
+ }
+ for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads
+ ThreadPtr tp(new tests::storePerftools::common::Thread(queueList[q]->startDequeues,
+ reinterpret_cast<void*>(queueList[q].get())));
+ threads.push_back(tp);
+ }
+ }
+ while (threads.size()) {
+ threads.front()->join();
+ threads.pop_front();
+ }
+ } // --- End of timed section ---
+
+ destroyQueues(queueList);
+// DEBUG MEASURE - REMOVE WHEN FIXED
+//::sleep(2);
+}
+
+void
+PerfTest::toStream(std::ostream& os) const
+{
+ m_testOpts.printVals(os);
+ os << std::endl;
+ m_storeOpts.printVals(os);
+ os << std::endl;
+ os << m_testResult << std::endl;
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+// -----------------------------------------------------------------
+
+int
+main(int argc, char** argv)
+{
+ qpid::CommonOptions co;
+ qpid::asyncStore::AsyncStoreOptions aso;
+ tests::storePerftools::asyncPerf::TestOptions to;
+ qpid::Options opts;
+ opts.add(co).add(aso).add(to);
+ try {
+ opts.parse(argc, argv);
+ aso.validate();
+ to.validate();
+ }
+ catch (std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ return 1;
+ }
+
+ // Handle options that just print information then exit.
+ if (co.version) {
+ std::cout << tests::storePerftools::name() << " v." << tests::storePerftools::version() << std::endl;
+ return 0;
+ }
+ if (co.help) {
+ std::cout << tests::storePerftools::name() << ": asyncPerf" << std::endl;
+ std::cout << "Performance test for the async store through the qpid async store interface." << std::endl;
+ std::cout << "Usage: asyncPerf [options]" << std::endl;
+ std::cout << opts << std::endl;
+ return 0;
+ }
+
+ // Create and start test
+ tests::storePerftools::asyncPerf::PerfTest apt(to, aso);
+ apt.run();
+
+ // Print test result
+ std::cout << apt << std::endl;
+ ::sleep(1);
+ return 0;
+}
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h
new file mode 100644
index 0000000000..1eb11a51fa
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file PerfTest.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_PerfTest_h_
+#define tests_storePerfTools_asyncPerf_PerfTest_h_
+
+#include "TestResult.h"
+
+#include "tests/storePerfTools/common/Streamable.h"
+
+#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Thread.h"
+
+#include <boost/shared_ptr.hpp>
+#include <deque>
+
+namespace qpid {
+namespace asyncStore {
+class AsyncStoreImpl;
+class AsyncStoreOptions;
+}}
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class MockPersistableQueue;
+class TestOptions;
+
+typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
+typedef boost::shared_ptr<MockPersistableQueue> MockPersistableQueuePtr;
+
+class PerfTest : public tests::storePerftools::common::Streamable
+{
+public:
+ PerfTest(const TestOptions& to,
+ const qpid::asyncStore::AsyncStoreOptions& aso);
+ virtual ~PerfTest();
+ void run();
+ void toStream(std::ostream& os = std::cout) const;
+
+protected:
+ const TestOptions& m_testOpts;
+ const qpid::asyncStore::AsyncStoreOptions& m_storeOpts;
+ TestResult m_testResult;
+ qpid::framing::FieldTable m_queueArgs;
+ const char* m_msgData;
+ boost::shared_ptr<qpid::sys::Poller> m_poller;
+ qpid::sys::Thread m_pollingThread;
+
+ AsyncStoreImplPtr prepareStore();
+ void prepareQueues(std::deque<MockPersistableQueuePtr>& jrnlList,
+ AsyncStoreImplPtr store);
+ void destroyQueues(std::deque<MockPersistableQueuePtr>& jrnlList);
+
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerfTools_asyncPerf_PerfTest_h_
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp
new file mode 100644
index 0000000000..315e202d8b
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file QueuedMessage.cpp
+ */
+
+#include "QueuedMessage.h"
+
+#include "MockTransactionContext.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+QueuedMessage::QueuedMessage(MockPersistableMessagePtr msg,
+ qpid::broker::EnqueueHandle& enqHandle,
+ MockTransactionContextPtr txn) :
+ m_msg(msg),
+ m_enqHandle(enqHandle),
+ m_txn(txn)
+{
+ if (txn) {
+ txn->addEnqueuedMsg(this);
+ }
+}
+
+QueuedMessage::~QueuedMessage()
+{}
+
+MockPersistableMessagePtr
+QueuedMessage::getMessage() const
+{
+ return m_msg;
+}
+
+qpid::broker::EnqueueHandle
+QueuedMessage::getEnqueueHandle() const
+{
+ return m_enqHandle;
+}
+
+MockTransactionContextPtr
+QueuedMessage::getTransactionContext() const
+{
+ return m_txn;
+}
+
+bool
+QueuedMessage::isTransactional() const
+{
+ return m_txn.get() != 0;
+}
+
+void
+QueuedMessage::clearTransaction()
+{
+ m_txn.reset(static_cast<MockTransactionContext*>(0));
+}
+
+}}} // namespace tests::storePerfTools
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h
new file mode 100644
index 0000000000..4b3dab67f9
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file QueuedMessage.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_QueuedMessage_h_
+#define tests_storePerfTools_asyncPerf_QueuedMessage_h_
+
+#include "qpid/broker/EnqueueHandle.h"
+#include <boost/shared_ptr.hpp>
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class MockPersistableMessage;
+class MockTransactionContext;
+
+typedef boost::shared_ptr<MockPersistableMessage> MockPersistableMessagePtr;
+typedef boost::shared_ptr<MockTransactionContext> MockTransactionContextPtr;
+
+class QueuedMessage
+{
+public:
+ QueuedMessage(MockPersistableMessagePtr msg,
+ qpid::broker::EnqueueHandle& enqHandle,
+ MockTransactionContextPtr txn);
+ virtual ~QueuedMessage();
+ MockPersistableMessagePtr getMessage() const;
+ qpid::broker::EnqueueHandle getEnqueueHandle() const;
+ MockTransactionContextPtr getTransactionContext() const;
+ bool isTransactional() const;
+ void clearTransaction();
+
+protected:
+ MockPersistableMessagePtr m_msg;
+ qpid::broker::EnqueueHandle m_enqHandle;
+ MockTransactionContextPtr m_txn;
+};
+
+}}} // namespace tests::storePerfTools
+
+#endif // tests_storePerfTools_asyncPerf_QueuedMessage_h_
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp
new file mode 100644
index 0000000000..27784ef661
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestOptions.cpp
+ */
+
+#include "TestOptions.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+// static declarations
+uint16_t TestOptions::s_defaultEnqTxnBlkSize = 0;
+uint16_t TestOptions::s_defaultDeqTxnBlkSize = 0;
+
+TestOptions::TestOptions(const std::string& name) :
+ tests::storePerftools::common::TestOptions(name),
+ m_enqTxnBlockSize(s_defaultEnqTxnBlkSize),
+ m_deqTxnBlockSize(s_defaultDeqTxnBlkSize)
+{
+ doAddOptions();
+}
+
+TestOptions::TestOptions(const uint32_t numMsgs,
+ const uint32_t msgSize,
+ const uint16_t numQueues,
+ const uint16_t numEnqThreadsPerQueue,
+ const uint16_t numDeqThreadsPerQueue,
+ const uint16_t enqTxnBlockSize,
+ const uint16_t deqTxnBlockSize,
+ const std::string& name) :
+ tests::storePerftools::common::TestOptions(numMsgs, msgSize, numQueues, numEnqThreadsPerQueue, numDeqThreadsPerQueue, name),
+ m_enqTxnBlockSize(enqTxnBlockSize),
+ m_deqTxnBlockSize(deqTxnBlockSize)
+{
+ doAddOptions();
+}
+
+TestOptions::~TestOptions()
+{}
+
+void
+TestOptions::printVals(std::ostream& os) const
+{
+ tests::storePerftools::common::TestOptions::printVals(os);
+ os << " Num enqueus per transaction [-t, --enq-txn-size]: " << m_enqTxnBlockSize << std::endl;
+ os << " Num dequeues per transaction [-d, --deq-txn-size]: " << m_deqTxnBlockSize << std::endl;
+}
+
+void
+TestOptions::doAddOptions()
+{
+ addOptions()
+ ("enq-txn-size,t", qpid::optValue(m_enqTxnBlockSize, "N"),
+ "Num enqueus per transaction (0 = no transactions)")
+ ("deq-txn-size,d", qpid::optValue(m_deqTxnBlockSize, "N"),
+ "Num dequeues per transaction (0 = no transactions)")
+ ;
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h
new file mode 100644
index 0000000000..b0e1e4ce74
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestOptions.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_TestOptions_h_
+#define tests_storePerfTools_asyncPerf_TestOptions_h_
+
+#include "tests/storePerfTools/common/TestOptions.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class TestOptions : public tests::storePerftools::common::TestOptions
+{
+public:
+ TestOptions(const std::string& name="Test Options");
+ TestOptions(const uint32_t numMsgs,
+ const uint32_t msgSize,
+ const uint16_t numQueues,
+ const uint16_t numEnqThreadsPerQueue,
+ const uint16_t numDeqThreadsPerQueue,
+ const uint16_t enqTxnBlockSize,
+ const uint16_t deqTxnBlockSize,
+ const std::string& name="Test Options");
+ virtual ~TestOptions();
+ void printVals(std::ostream& os) const;
+
+ uint16_t m_enqTxnBlockSize; ///< Transaction block size for enqueues
+ uint16_t m_deqTxnBlockSize; ///< Transaction block size for dequeues
+
+protected:
+ static uint16_t s_defaultEnqTxnBlkSize; ///< Default transaction block size for enqueues
+ static uint16_t s_defaultDeqTxnBlkSize; ///< Default transaction block size for dequeues
+
+ void doAddOptions();
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerfTools_asyncPerf_TestOptions_h_
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp
new file mode 100644
index 0000000000..cf6f293494
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestResult.cpp
+ */
+
+#include "TestResult.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+TestResult::TestResult(const TestOptions& to) :
+ tests::storePerftools::common::TestResult(),
+ m_testOpts(to)
+{}
+
+TestResult::~TestResult()
+{}
+
+void
+TestResult::toStream(std::ostream& os) const
+{
+ double msgsRate;
+ os << "TEST RESULTS:" << std::endl;
+ os << " Msgs per thread: " << m_testOpts.m_numMsgs << std::endl;
+ os << " Msg size: " << m_testOpts.m_msgSize << std::endl;
+ os << " No. queues: " << m_testOpts.m_numQueues << std::endl;
+ os << " No. enq threads/queue: " << m_testOpts.m_numEnqThreadsPerQueue << std::endl;
+ os << " No. deq threads/queue: " << m_testOpts.m_numDeqThreadsPerQueue << std::endl;
+ os << " Time taken: " << m_elapsed << " sec" << std::endl;
+ uint32_t msgsPerQueue = m_testOpts.m_numMsgs * m_testOpts.m_numEnqThreadsPerQueue;
+ if (m_testOpts.m_numQueues > 1) {
+ msgsRate = double(msgsPerQueue) / m_elapsed;
+ os << " No. msgs per queue: " << msgsPerQueue << std::endl;
+ os << "Per queue msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl;
+ os << " " << (msgsRate * m_testOpts.m_msgSize / 1e6) << " MB/sec" << std::endl;
+ }
+ uint32_t totalMsgs = msgsPerQueue * m_testOpts.m_numQueues;
+ msgsRate = double(totalMsgs) / m_elapsed;
+ os << " Total no. msgs: " << totalMsgs << std::endl;
+ os << " Broker msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl;
+ os << " " << (msgsRate * m_testOpts.m_msgSize / 1e6) << " MB/sec" << std::endl;
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h
new file mode 100644
index 0000000000..1310689ff8
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestResult.h
+ */
+
+#ifndef tests_storePerfTools_asyncPerf_TestResult_h_
+#define tests_storePerfTools_asyncPerf_TestResult_h_
+
+#include "TestOptions.h"
+
+#include "tests/storePerfTools/common/TestResult.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class TestOptions;
+
+/**
+ * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in the journal.
+ *
+ * This class (being subclassed from ScopedTimable) is passed to a ScopedTimer object on construction, and the
+ * inherited _elapsed member will be written with the calculated elapsed time (in seconds) on destruction of the
+ * ScopedTimer object. This time (initially set to 0.0) is used to calculate message and message byte throughput.
+ * The message number and size information comes from the JrnlPerfTestParameters object passed to the constructor.
+ *
+ * Results are available through the use of toStream(), toString() or the << operators.
+ *
+ * Output is in the following format:
+ * <pre>
+ * TEST RESULTS:
+ * Msgs per thread: 10000
+ * Msg size: 2048
+ * No. queues: 2
+ * No. threads/queue: 2
+ * Time taken: 1.6626 sec
+ * Total no. msgs: 40000
+ * Msg throughput: 24.0587 kMsgs/sec
+ * 49.2723 MB/sec
+ * </pre>
+ */
+class TestResult : public tests::storePerftools::common::TestResult
+{
+public:
+ /**
+ * \brief Constructor
+ *
+ * Constructor. Will start the time interval measurement.
+ *
+ * \param tp Test parameter details used to calculate the performance results.
+ */
+ TestResult(const TestOptions& to);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~TestResult();
+
+ /**
+ * \brief Stream the performance test results to an output stream
+ *
+ * Convenience feature which streams a multi-line performance result an output stream.
+ *
+ * \param os Output stream to which the results are to be streamed
+ */
+ void toStream(std::ostream& os = std::cout) const;
+
+protected:
+ TestOptions m_testOpts; ///< Test parameters used for performance calculations
+
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerfTools_asyncPerf_TestResult_h_
diff --git a/cpp/src/tests/storePerfTools/common/Parameters.cpp b/cpp/src/tests/storePerfTools/common/Parameters.cpp
new file mode 100644
index 0000000000..8e4bafaf86
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/Parameters.cpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Parameters.cpp
+ */
+
+#include "Parameters.h"
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+Parameters::~Parameters()
+{}
+
+}}} // namespace tests::storePerftools::common
diff --git a/cpp/src/tests/storePerfTools/common/Parameters.h b/cpp/src/tests/storePerfTools/common/Parameters.h
new file mode 100644
index 0000000000..03dd2163f8
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/Parameters.h
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Parameters.h
+ */
+#ifndef tests_storePerfTools_common_Parameters_h_
+#define tests_storePerfTools_common_Parameters_h_
+
+#include "Streamable.h"
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+class Parameters: public Streamable
+{
+public:
+ virtual ~Parameters();
+ virtual bool parseArg(const int arg,
+ const char* optarg) = 0;
+
+};
+
+}}} // namespace tests::storePerfTools::common
+
+#endif // tests_storePerfTools_common_Parameters_h_
diff --git a/cpp/src/tests/storePerfTools/common/PerftoolError.cpp b/cpp/src/tests/storePerfTools/common/PerftoolError.cpp
new file mode 100644
index 0000000000..5bb61b6519
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/PerftoolError.cpp
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file PerftoolError.cpp
+ */
+
+#include "PerftoolError.h"
+
+#include <iomanip> // std::setfill(), std::setw()
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+// private
+PerftoolError::PerftoolError() :
+ std::runtime_error(std::string())
+{}
+
+PerftoolError::PerftoolError(const uint32_t errCode) throw () :
+ std::runtime_error(std::string()),
+ m_errCode(errCode)
+{
+ formatWhatStr();
+}
+
+PerftoolError::PerftoolError(const std::string& errMsg) throw () :
+ std::runtime_error(std::string()),
+ m_errCode(0),
+ m_errMsg(errMsg)
+{
+ formatWhatStr();
+}
+
+PerftoolError::PerftoolError(const uint32_t errCode,
+ const std::string& errMsg) throw () :
+ std::runtime_error(std::string()),
+ m_errCode(errCode),
+ m_errMsg(errMsg)
+{
+ formatWhatStr();
+}
+
+PerftoolError::PerftoolError(const uint32_t errCode,
+ const std::string& throwingClass,
+ const std::string& throwingFunction) throw () :
+ std::runtime_error(std::string()),
+ m_errCode(errCode),
+ m_throwingClass(throwingClass),
+ m_throwingFunction(throwingFunction)
+{
+ formatWhatStr();
+}
+
+PerftoolError::PerftoolError(const std::string& errMsg,
+ const std::string& throwingClass,
+ const std::string& throwingFunction) throw () :
+ std::runtime_error(std::string()),
+ m_errCode(0),
+ m_errMsg(errMsg),
+ m_throwingClass(throwingClass),
+ m_throwingFunction(throwingFunction)
+{
+ formatWhatStr();
+}
+
+PerftoolError::PerftoolError(const uint32_t errCode,
+ const std::string& errMsg,
+ const std::string& throwingClass,
+ const std::string& throwingFunction) throw () :
+ std::runtime_error(std::string()),
+ m_errCode(errCode),
+ m_errMsg(errMsg),
+ m_throwingClass(throwingClass),
+ m_throwingFunction(throwingFunction)
+{}
+
+PerftoolError::~PerftoolError() throw()
+{}
+
+const char*
+PerftoolError::what() const throw ()
+{
+ return m_what.c_str();
+}
+
+uint32_t
+PerftoolError::getErrorCode() const throw ()
+{
+ return m_errCode;
+}
+
+const std::string
+PerftoolError::getAdditionalInfo() const throw ()
+{
+ return m_errMsg;
+}
+
+const std::string
+PerftoolError::getThrowingClass() const throw ()
+{
+ return m_throwingClass;
+}
+
+const std::string
+PerftoolError::getThrowingFunction() const throw ()
+{
+ return m_throwingFunction;
+}
+
+void
+PerftoolError::toStream(std::ostream& os) const
+{
+ os << what();
+}
+
+// protected
+void
+PerftoolError::formatWhatStr() throw ()
+{
+ try {
+ const bool ai = !m_errMsg.empty();
+ const bool tc = !m_throwingClass.empty();
+ const bool tf = !m_throwingFunction.empty();
+ std::ostringstream oss;
+ oss << className() << " 0x" << std::hex << std::setfill('0') << std::setw(4) << m_errCode << " ";
+ if (tc) {
+ oss << m_throwingClass;
+ if (tf) {
+ oss << "::";
+ } else {
+ oss << " ";
+ }
+ }
+ if (tf) {
+ oss << m_throwingFunction << "() ";
+ }
+ if (tc || tf) {
+ oss << "threw " << s_errorMessage(m_errCode);
+ }
+ if (ai) {
+ oss << " (" << m_errMsg << ")";
+ }
+ m_what.assign(oss.str());
+ } catch (...) {}
+}
+
+// protected
+const char*
+PerftoolError::className()
+{
+ return s_className;
+}
+
+//static
+const char* PerftoolError::s_className = "PerftoolError";
+
+// --- Static definitions ---
+PerftoolError::errorMap_t PerftoolError::s_errorMap;
+PerftoolError::errorMapCitr_t PerftoolError::s_errorMapIterator;
+bool PerftoolError::s_initializedFlag = PerftoolError::s_initialize();
+
+// --- Generic and system errors ---
+const uint32_t PerftoolError::PERR_PTHREAD = 0x0001;
+
+// static
+const char*
+PerftoolError::s_errorMessage(const uint32_t err_no) throw ()
+{
+ s_errorMapIterator = s_errorMap.find(err_no);
+ if (s_errorMapIterator == s_errorMap.end())
+ return "<Unknown error code>";
+ return s_errorMapIterator->second;
+}
+
+// protected static
+bool
+PerftoolError::s_initialize()
+{
+ s_errorMap[PERR_PTHREAD] = "ERR_PTHREAD: pthread operation failure";
+
+ return true;
+}
+
+}}} // namespace tests::storePerftools::common
diff --git a/cpp/src/tests/storePerfTools/common/PerftoolError.h b/cpp/src/tests/storePerfTools/common/PerftoolError.h
new file mode 100644
index 0000000000..8f994d7d28
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/PerftoolError.h
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file PerftoolError.h
+ */
+
+#ifndef tests_storePerfTools_common_PerftoolError_h_
+#define tests_storePerfTools_common_PerftoolError_h_
+
+#include "Streamable.h"
+
+#include <map>
+#include <stdexcept> // std::runtime_error
+#include <stdint.h> // uint32_t
+
+// Macro definitions
+
+#include <cstring> // std::strerror()
+#include <sstream> // std::ostringstream
+
+/**
+ * \brief Macro to retrieve and format the C errno value as a string.
+ *
+ * \param errno Value of errno to be formatted.
+ */
+#define FORMAT_SYSERR(errno) " errno=" << errno << " (" << std::strerror(errno) << ")"
+
+/**
+ * \brief Macro to check for a clean pthread creation and throwing a JournalException with code JERR_PTHREAD if
+ * thread creation failed.
+ *
+ * \param err Value or errno.
+ * \param pfn Name of system call that failed.
+ * \param cls Name of class in which function failed.
+ * \param fn Name of class function that failed.
+ */
+#define PTHREAD_CHK(err, pfn, cls, fn) if(err != 0) { \
+ std::ostringstream oss; \
+ oss << pfn << " failed: " << FORMAT_SYSERR(err); \
+ throw tests::storePerftools::common::PerftoolError(tests::storePerftools::common::PerftoolError::PERR_PTHREAD, oss.str(), cls, fn); \
+ }
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+class PerftoolError: public std::runtime_error, public Streamable
+{
+public:
+ // --- Constructors & destructors ---
+ PerftoolError(const uint32_t errCode) throw ();
+ PerftoolError(const std::string& errMsg) throw ();
+ PerftoolError(const uint32_t errCode,
+ const std::string& errMsg) throw ();
+ PerftoolError(const uint32_t errCode,
+ const std::string& throwingClass,
+ const std::string& throwingFunction) throw ();
+ PerftoolError(const std::string& errMsg,
+ const std::string& throwingClass,
+ const std::string& throwingFunction) throw ();
+ PerftoolError(const uint32_t errCode,
+ const std::string& errMsg,
+ const std::string& throwingClass,
+ const std::string& throwingFunction) throw ();
+ virtual ~PerftoolError() throw();
+
+ const char* what() const throw (); // overrides std::runtime_error::what()
+ uint32_t getErrorCode() const throw ();
+ const std::string getAdditionalInfo() const throw ();
+ const std::string getThrowingClass() const throw ();
+ const std::string getThrowingFunction() const throw ();
+
+ // --- Implementation of class Streamable ---
+ virtual void toStream(std::ostream& os = std::cout) const;
+
+ // --- Generic and system errors ---
+ static const uint32_t PERR_PTHREAD; ///< pthread operation failure
+
+
+ static const char* s_errorMessage(const uint32_t err_no) throw ();
+
+
+protected:
+ uint32_t m_errCode; ///< Error or failure code, taken from JournalErrors.
+ std::string m_errMsg; ///< Additional information pertaining to the error or failure.
+ std::string m_throwingClass; ///< Name of the class throwing the error.
+ std::string m_throwingFunction; ///< Name of the function throwing the error.
+ std::string m_what; ///< Standard error of failure message, taken from JournalErrors.
+
+ void formatWhatStr() throw ();
+ virtual const char* className();
+
+ typedef std::map<uint32_t, const char*> errorMap_t; ///< Type for map of error messages
+ typedef errorMap_t::const_iterator errorMapCitr_t; ///< Const iterator for map of error messages
+
+ static errorMap_t s_errorMap; ///< Map of error messages
+ static errorMapCitr_t s_errorMapIterator; ///< Const iterator
+
+private:
+ static const char* s_className; ///< Name of this class, used in formatting error messages.
+ static bool s_initializedFlag; ///< Dummy flag, used to initialize map.
+
+ PerftoolError();
+ static bool s_initialize(); ///< Static fn for initializing static data
+
+};
+
+}}} // namespace tests::stprePerftools::common
+
+#endif // tests_storePerfTools_common_PerftoolError_h_
diff --git a/cpp/src/tests/storePerfTools/common/ScopedTimable.cpp b/cpp/src/tests/storePerfTools/common/ScopedTimable.cpp
new file mode 100644
index 0000000000..c2023b7854
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/ScopedTimable.cpp
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ScopedTimable.cpp
+ */
+
+#include "ScopedTimable.h"
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+ScopedTimable::ScopedTimable() :
+ m_elapsed(0.0)
+{}
+
+ScopedTimable::~ScopedTimable()
+{}
+
+double&
+ScopedTimable::getElapsedRef()
+{
+ return m_elapsed;
+}
+
+}}} // namespace tests::storePerftools::common
diff --git a/cpp/src/tests/storePerfTools/common/ScopedTimable.h b/cpp/src/tests/storePerfTools/common/ScopedTimable.h
new file mode 100644
index 0000000000..3c21a4aafa
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/ScopedTimable.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ScopedTimable.h
+ */
+
+#ifndef tests_storePerfTools_common_ScopedTimable_h_
+#define tests_storePerfTools_common_ScopedTimable_h_
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+/**
+ * \brief Scoped timer class that starts timing on construction and finishes on destruction.
+ *
+ * This class is designed to be the parent class for a performance result class which depends on the elapsed
+ * time of some process or event. By passing this (or its subclasses) to ScopedTimer (which only exists within
+ * the scope of the event), the _elapsed member of this class will be written with the elapsed time when the
+ * ScopedTimer object goes out of scope or is destroyed.
+ *
+ * Subclasses may be aware of the parameters being timed, and may thus print and/or display performance and/or
+ * rate information for these parameters.
+ */
+class ScopedTimable
+{
+public:
+ ScopedTimable();
+ virtual ~ScopedTimable();
+ double& getElapsedRef();
+
+protected:
+ double m_elapsed; ///< Elapsed time, will be written on destruction of ScopedTimer instances
+
+};
+
+}}} // namespace tests::storePerftools::common
+
+#endif // tests_storePerfTools_common_ScopedTimable_h_
diff --git a/cpp/src/tests/storePerfTools/common/ScopedTimer.cpp b/cpp/src/tests/storePerfTools/common/ScopedTimer.cpp
new file mode 100644
index 0000000000..8312174cad
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/ScopedTimer.cpp
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ScopedTimer.cpp
+ */
+
+#include "ScopedTimer.h"
+
+#include "ScopedTimable.h"
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+ScopedTimer::ScopedTimer(double& elapsed) :
+ m_elapsed(elapsed)
+{
+ ::clock_gettime(CLOCK_REALTIME, &m_startTime);
+}
+
+ScopedTimer::ScopedTimer(ScopedTimable& st) :
+ m_elapsed(st.getElapsedRef())
+{
+ ::clock_gettime(CLOCK_REALTIME, &m_startTime);
+}
+
+ScopedTimer::~ScopedTimer()
+{
+ ::timespec stopTime;
+ ::clock_gettime(CLOCK_REALTIME, &stopTime);
+ m_elapsed = _s_getDoubleTime(stopTime) - _s_getDoubleTime(m_startTime);
+}
+
+// static
+double ScopedTimer::_s_getDoubleTime(const ::timespec& ts)
+{
+ return ts.tv_sec + (double(ts.tv_nsec) / 1e9);
+}
+
+
+}}} // namespace tests::storePerftools::common
diff --git a/cpp/src/tests/storePerfTools/common/ScopedTimer.h b/cpp/src/tests/storePerfTools/common/ScopedTimer.h
new file mode 100644
index 0000000000..c23d056f10
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/ScopedTimer.h
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file ScopedTimer.h
+ */
+
+#ifndef tests_storePerfTools_common_ScopedTimer_h_
+#define tests_storePerfTools_common_ScopedTimer_h_
+
+#include <ctime>
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+class ScopedTimable;
+
+/**
+ * \brief Scoped timer class that starts timing on construction and finishes on destruction.
+ *
+ * The scoped timer will take the current time on construction and again on destruction. The destructor
+ * will calculate the elapsed time from the difference between these two times and write the result
+ * as a double to the double ref supplied to the constructor. A second constructor will accept a class (or
+ * subclass) of ScopedTimable, which contains a double to which the result may be written and accessed at a
+ * later time.
+ */
+class ScopedTimer
+{
+public:
+ /**
+ * \brief Constructor
+ *
+ * Constructor which accepts a ref to a double. Will start the time interval measurement.
+ *
+ * \param elapsed A ref to a double which will contain the elapsed time in seconds after this class instance
+ * is destroyed.
+ */
+ ScopedTimer(double& elapsed);
+
+ /**
+ * \brief Constructor
+ *
+ * Constructor which accepts a ref to a ScopedTimable. Will start the time interval measurement.
+ *
+ * \param st A ref to a ScopedTimable into which the result of the ScopedTimer can be written.
+ */
+ ScopedTimer(ScopedTimable& st);
+
+ /**
+ * \brief Destructor
+ *
+ * Destructor. Will stop the time interval measurement and write the calculated elapsed time into _elapsed.
+ */
+ virtual ~ScopedTimer();
+
+protected:
+ double& m_elapsed; ///< Ref to elapsed time, will be written on destruction of ScopedTimer instances
+ ::timespec m_startTime; ///< Start time, set on construction
+
+ /**
+ * \brief Convert ::timespec to seconds
+ *
+ * Static function to convert a ::timespec struct into a double representation in seconds.
+ *
+ * \param ts std::timespec struct containing the time to be converted.
+ * \return A double which represents the time in parameter ts in seconds.
+ */
+ static double _s_getDoubleTime(const ::timespec& ts);
+
+};
+
+}}} // namespace tests::storePerftools::common
+
+#endif // tests_storePerfTools_common_ScopedTimer_h_
diff --git a/cpp/src/tests/storePerfTools/common/Streamable.cpp b/cpp/src/tests/storePerfTools/common/Streamable.cpp
new file mode 100644
index 0000000000..8c58f1c03e
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/Streamable.cpp
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Streamable.cpp
+ */
+
+#include "Streamable.h"
+
+#include <sstream>
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+std::string
+Streamable::toString() const
+{
+ std::ostringstream oss;
+ toStream(oss);
+ return oss.str();
+}
+
+std::ostream&
+operator<<(std::ostream& os, const Streamable& s)
+{
+ s.toStream(os);
+ return os;
+}
+
+std::ostream&
+operator<<(std::ostream& os, const Streamable* sPtr)
+{
+ sPtr->toStream(os);
+ return os;
+}
+
+}}} // namespace tests::storePerftools::common
diff --git a/cpp/src/tests/storePerfTools/common/Streamable.h b/cpp/src/tests/storePerfTools/common/Streamable.h
new file mode 100644
index 0000000000..504a3d97dd
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/Streamable.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Streamable.h
+ */
+
+#ifndef tests_storePerfTools_common_Streamable_h_
+#define tests_storePerfTools_common_Streamable_h_
+
+#include <iostream>
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+/**
+ * \brief Abstract class which provides the mechanisms to stream
+ *
+ * An abstract class which provides stream functions. The toStream() function must be implemented by subclasses,
+ * and is used by the remaining functions. For convenience, toString() returns a std::string object.
+ */
+class Streamable
+{
+public:
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~Streamable() {}
+
+ /***
+ * \brief Stream some representation of the object to an output stream
+ *
+ * \param os Output stream to which the class data is to be streamed
+ */
+ virtual void toStream(std::ostream& os = std::cout) const = 0;
+
+ /**
+ * \brief Creates a string representation of the test parameters
+ *
+ * Convenience feature which creates and returns a std::string object containing the content of toStream().
+ *
+ * \return Content of toStream()
+ */
+ std::string toString() const;
+
+ /**
+ * \brief Stream the object to an output stream
+ */
+ friend std::ostream& operator<<(std::ostream& os,
+ const Streamable& s);
+
+ /**
+ * \brief Stream the object to an output stream through an object pointer
+ */
+ friend std::ostream& operator<<(std::ostream& os,
+ const Streamable* sPtr);
+
+};
+
+}}} // namespace tests::storePerftools::common
+
+#endif // tests_storePerfTools_common_Streamable_h_
diff --git a/cpp/src/tests/storePerfTools/common/TestOptions.cpp b/cpp/src/tests/storePerfTools/common/TestOptions.cpp
new file mode 100644
index 0000000000..39e3434a6c
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/TestOptions.cpp
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestOptions.cpp
+ */
+
+#include "TestOptions.h"
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+// static declarations
+uint32_t TestOptions::s_defaultNumMsgs = 1024;
+uint32_t TestOptions::s_defaultMsgSize = 1024;
+uint16_t TestOptions::s_defaultNumQueues = 1;
+uint16_t TestOptions::s_defaultEnqThreadsPerQueue = 1;
+uint16_t TestOptions::s_defaultDeqThreadsPerQueue = 1;
+
+TestOptions::TestOptions(const std::string& name) :
+ qpid::Options(name),
+ m_numMsgs(s_defaultNumMsgs),
+ m_msgSize(s_defaultMsgSize),
+ m_numQueues(s_defaultNumQueues),
+ m_numEnqThreadsPerQueue(s_defaultEnqThreadsPerQueue),
+ m_numDeqThreadsPerQueue(s_defaultDeqThreadsPerQueue)
+{
+ doAddOptions();
+}
+
+TestOptions::TestOptions(const uint32_t numMsgs,
+ const uint32_t msgSize,
+ const uint16_t numQueues,
+ const uint16_t numEnqThreadsPerQueue,
+ const uint16_t numDeqThreadsPerQueue,
+ const std::string& name) :
+ qpid::Options(name),
+ m_numMsgs(numMsgs),
+ m_msgSize(msgSize),
+ m_numQueues(numQueues),
+ m_numEnqThreadsPerQueue(numEnqThreadsPerQueue),
+ m_numDeqThreadsPerQueue(numDeqThreadsPerQueue)
+{
+ doAddOptions();
+}
+
+TestOptions::~TestOptions()
+{}
+
+void
+TestOptions::printVals(std::ostream& os) const
+{
+ os << "TEST OPTIONS:" << std::endl;
+ os << " Number of queues [-q, --num-queues]: " << m_numQueues << std::endl;
+ os << " Number of producers per queue [-p, --num-producers]: " << m_numEnqThreadsPerQueue << std::endl;
+ os << " Number of consumers per queue [-c, --num-consumers]: " << m_numDeqThreadsPerQueue << std::endl;
+ os << " Number of messages to send per producer [-m, --num-msgs]: " << m_numMsgs << std::endl;
+ os << " Size of each message (bytes) [-s, --msg-size]: " << m_msgSize << std::endl;
+}
+
+void
+TestOptions::validate()
+{
+ if (((m_numEnqThreadsPerQueue * m_numMsgs) % m_numDeqThreadsPerQueue) != 0) {
+ throw qpid::Exception("Parameter Error: (num-producers * num-msgs) must be a multiple of num-consumers.");
+ }
+}
+
+void
+TestOptions::doAddOptions()
+{
+ addOptions()
+ ("num-queues,q", qpid::optValue(m_numQueues, "N"),
+ "Number of queues")
+ ("num-producers,p", qpid::optValue(m_numEnqThreadsPerQueue, "N"),
+ "Number of producers per queue")
+ ("num-consumers,c", qpid::optValue(m_numDeqThreadsPerQueue, "N"),
+ "Number of consumers per queue")
+ ("num-msgs,m", qpid::optValue(m_numMsgs, "N"),
+ "Number of messages to send per producer")
+ ("msg-size,s", qpid::optValue(m_msgSize, "N"),
+ "Size of each message (bytes)")
+ ;
+}
+
+}}} // namespace tests::storePerftools::common
diff --git a/cpp/src/tests/storePerfTools/common/TestOptions.h b/cpp/src/tests/storePerfTools/common/TestOptions.h
new file mode 100644
index 0000000000..6a4479fa56
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/TestOptions.h
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestOptions.h
+ */
+
+#ifndef tests_storePerfTools_common_TestOptions_h_
+#define tests_storePerfTools_common_TestOptions_h_
+
+#include "qpid/Options.h"
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+class TestOptions : public qpid::Options
+{
+public:
+ TestOptions(const std::string& name="Test Options");
+ TestOptions(const uint32_t numMsgs,
+ const uint32_t msgSize,
+ const uint16_t numQueues,
+ const uint16_t numEnqThreadsPerQueue,
+ const uint16_t numDeqThreadsPerQueue,
+ const std::string& name="Test Options");
+ virtual ~TestOptions();
+ void printVals(std::ostream& os) const;
+ void validate();
+
+ uint32_t m_numMsgs; ///< Number of messages to be sent
+ uint32_t m_msgSize; ///< Message size in bytes
+ uint16_t m_numQueues; ///< Number of queues to test simultaneously
+ uint16_t m_numEnqThreadsPerQueue; ///< Number of enqueue threads per queue
+ uint16_t m_numDeqThreadsPerQueue; ///< Number of dequeue threads per queue
+
+protected:
+ static uint32_t s_defaultNumMsgs; ///< Default number of messages to be sent
+ static uint32_t s_defaultMsgSize; ///< Default message size in bytes
+ static uint16_t s_defaultNumQueues; ///< Default number of queues to test simultaneously
+ static uint16_t s_defaultEnqThreadsPerQueue; ///< Default number of enqueue threads per queue
+ static uint16_t s_defaultDeqThreadsPerQueue; ///< Default number of dequeue threads per queue
+
+ void doAddOptions();
+
+};
+
+}}} // namespace tests::storePerftools::common
+
+#endif // tests_storePerfTools_common_TestOptions_h_
diff --git a/cpp/src/tests/storePerfTools/common/TestParameters.cpp b/cpp/src/tests/storePerfTools/common/TestParameters.cpp
new file mode 100644
index 0000000000..f36a2d3bda
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/TestParameters.cpp
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestParameters.cpp
+ */
+
+#include "TestParameters.h"
+
+#include <cstdlib> // std::atoi, std::atol
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+// static declarations
+uint32_t TestParameters::s_defaultNumMsgs = 1024;
+uint32_t TestParameters::s_defaultMsgSize = 1024;
+uint16_t TestParameters::s_defaultNumQueues = 1;
+uint16_t TestParameters::s_defaultEnqThreadsPerQueue = 1;
+uint16_t TestParameters::s_defaultDeqThreadsPerQueue = 1;
+
+TestParameters::TestParameters():
+ Parameters(),
+ m_numMsgs(s_defaultNumMsgs),
+ m_msgSize(s_defaultMsgSize),
+ m_numQueues(s_defaultNumQueues),
+ m_numEnqThreadsPerQueue(s_defaultEnqThreadsPerQueue),
+ m_numDeqThreadsPerQueue(s_defaultDeqThreadsPerQueue)//,
+{}
+
+TestParameters::TestParameters(const uint32_t numMsgs,
+ const uint32_t msgSize,
+ const uint16_t numQueues,
+ const uint16_t numEnqThreadsPerQueue,
+ const uint16_t numDeqThreadsPerQueue) :
+ Parameters(),
+ m_numMsgs(numMsgs),
+ m_msgSize(msgSize),
+ m_numQueues(numQueues),
+ m_numEnqThreadsPerQueue(numEnqThreadsPerQueue),
+ m_numDeqThreadsPerQueue(numDeqThreadsPerQueue)
+{}
+
+TestParameters::TestParameters(const TestParameters& tp):
+ Parameters(),
+ m_numMsgs(tp.m_numMsgs),
+ m_msgSize(tp.m_msgSize),
+ m_numQueues(tp.m_numQueues),
+ m_numEnqThreadsPerQueue(tp.m_numEnqThreadsPerQueue),
+ m_numDeqThreadsPerQueue(tp.m_numDeqThreadsPerQueue)
+{}
+
+TestParameters::~TestParameters()
+{}
+
+void
+TestParameters::toStream(std::ostream& os) const
+{
+ os << "Test Parameters:" << std::endl;
+ os << " num_msgs = " << m_numMsgs << std::endl;
+ os << " msg_size = " << m_msgSize << std::endl;
+ os << " num_queues = " << m_numQueues << std::endl;
+ os << " num_enq_threads_per_queue = " << m_numEnqThreadsPerQueue << std::endl;
+ os << " num_deq_threads_per_queue = " << m_numDeqThreadsPerQueue << std::endl;
+}
+
+bool
+TestParameters::parseArg(const int arg,
+ const char* optarg)
+{
+ switch(arg) {
+ case 'm':
+ m_numMsgs = uint32_t(std::atol(optarg));
+ break;
+ case 'S':
+ m_msgSize = uint32_t(std::atol(optarg));
+ break;
+ case 'q':
+ m_numQueues = uint16_t(std::atoi(optarg));
+ break;
+ case 'e':
+ m_numEnqThreadsPerQueue = uint16_t(std::atoi(optarg));
+ break;
+ case 'd':
+ m_numDeqThreadsPerQueue = uint16_t(std::atoi(optarg));
+ break;
+ default:
+ return false;
+ }
+ return true;
+}
+
+// static
+void
+TestParameters::printArgs(std::ostream& os)
+{
+ os << "Test parameters:" << std::endl;
+ os << " -m --num_msgs: Number of messages to send per enqueue thread ["
+ << TestParameters::s_defaultNumMsgs << "]" << std::endl;
+ os << " -S --msg_size: Size of each message to be sent ["
+ << TestParameters::s_defaultMsgSize << "]" << std::endl;
+ os << " -q --num_queues: Number of simultaneous queues ["
+ << TestParameters::s_defaultNumQueues << "]" << std::endl;
+ os << " -e --num_enq_threads_per_queue: Number of enqueue threads per queue ["
+ << TestParameters::s_defaultEnqThreadsPerQueue << "]" << std::endl;
+ os << " -d --num_deq_threads_per_queue: Number of dequeue threads per queue ["
+ << TestParameters::s_defaultDeqThreadsPerQueue << "]" << std::endl;
+ os << std::endl;
+}
+
+// static
+std::string
+TestParameters::shortArgs()
+{
+ return "m:S:q:e:d:";
+}
+
+}}} // namespace tests::storePerftools::common
diff --git a/cpp/src/tests/storePerfTools/common/TestParameters.h b/cpp/src/tests/storePerfTools/common/TestParameters.h
new file mode 100644
index 0000000000..ea73589609
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/TestParameters.h
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestParameters.h
+ */
+
+#ifndef tests_storePerfTools_common_TestParameters_h_
+#define tests_storePerfTools_common_TestParameters_h_
+
+#include "Parameters.h"
+
+#include <stdint.h> // uint16_t, uint32_t
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+class TestOptions;
+
+/**
+ * \brief Struct for aggregating the test parameters
+ *
+ * This struct is used to aggregate and keep together all the test parameters. These affect the test itself, the
+ * journal geometry is aggregated in class JrnlParameters.
+ */
+class TestParameters : public Parameters
+{
+public:
+ static uint32_t s_defaultNumMsgs; ///< Default number of messages to be sent
+ static uint32_t s_defaultMsgSize; ///< Default message size in bytes
+ static uint16_t s_defaultNumQueues; ///< Default number of queues to test simultaneously
+ static uint16_t s_defaultEnqThreadsPerQueue; ///< Default number of enqueue threads per queue
+ static uint16_t s_defaultDeqThreadsPerQueue; ///< Default number of dequeue threads per queue
+
+ uint32_t m_numMsgs; ///< Number of messages to be sent
+ uint32_t m_msgSize; ///< Message size in bytes
+ uint16_t m_numQueues; ///< Number of queues to test simultaneously
+ uint16_t m_numEnqThreadsPerQueue; ///< Number of enqueue threads per queue
+ uint16_t m_numDeqThreadsPerQueue; ///< Number of dequeue threads per queue
+
+ /**
+ * \brief Defaault constructor
+ *
+ * Default constructor. Uses the default values for all parameters.
+ */
+ TestParameters();
+
+ /**
+ * \brief Constructor
+ *
+ * Convenience constructor.
+ *
+ * \param numMsgs Number of messages to be sent
+ * \param msgSize Message size in bytes
+ * \param numQueues Number of queues to test simultaneously
+ * \param numEnqThreadsPerQueue Number of enqueue threads per queue
+ * \param numDeqThreadsPerQueue Number of dequeue threads per queue
+ */
+ TestParameters(const uint32_t numMsgs,
+ const uint32_t msgSize,
+ const uint16_t numQueues,
+ const uint16_t numEnqThreadsPerQueue,
+ const uint16_t numDeqThreadsPerQueue);
+
+ /**
+ * \brief Copy constructor
+ *
+ * \param tp Reference to JrnlPerfTestParameters instance to be copied
+ */
+ TestParameters(const TestParameters& tp);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~TestParameters();
+
+ virtual bool parseArg(const int arg,
+ const char* optarg);
+
+ static void printArgs(std::ostream& os);
+
+ static std::string shortArgs();
+
+ /***
+ * \brief Stream the test parameters to an output stream
+ *
+ * Convenience feature which streams a multi-line representation of all the test parameters, one per line to an
+ * output stream.
+ *
+ * \param os Output stream to which the class data is to be streamed
+ */
+ void toStream(std::ostream& os = std::cout) const;
+
+};
+
+}}} // namespace tests::storePerftools::common
+
+#endif // tests_storePerfTools_common_TestParameters_h_
diff --git a/cpp/src/tests/storePerfTools/common/TestResult.cpp b/cpp/src/tests/storePerfTools/common/TestResult.cpp
new file mode 100644
index 0000000000..c3e9d27dfc
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/TestResult.cpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestResult.cpp
+ */
+
+#include "TestResult.h"
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+TestResult::TestResult() :
+ ScopedTimable(),
+ Streamable()
+{}
+
+TestResult::~TestResult()
+{}
+
+}}} // namespace tests::storePerftools::common
diff --git a/cpp/src/tests/storePerfTools/common/TestResult.h b/cpp/src/tests/storePerfTools/common/TestResult.h
new file mode 100644
index 0000000000..e2217286b4
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/TestResult.h
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestResult.h
+ */
+
+#ifndef tests_storePerfTools_common_TestResult_h_
+#define tests_storePerfTools_common_TestResult_h_
+
+#include "ScopedTimable.h"
+#include "Streamable.h"
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+class TestResult : public ScopedTimable, public Streamable
+{
+public:
+ TestResult();
+ virtual ~TestResult();
+ void toStream(std::ostream& os = std::cout) const = 0;
+};
+
+}}} // namespace tests:storePerftools::common
+
+#endif // tests_storePerfTools_common_TestResult_h_
diff --git a/cpp/src/tests/storePerfTools/common/Thread.cpp b/cpp/src/tests/storePerfTools/common/Thread.cpp
new file mode 100644
index 0000000000..188e102e8f
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/Thread.cpp
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Thread.cpp
+ */
+
+#include "Thread.h"
+
+#include "PerftoolError.h"
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+Thread::Thread(startFn_t sf,
+ void* p) :
+ m_running(true)
+{
+ PTHREAD_CHK(::pthread_create(&m_thread, NULL, sf, p), "::pthread_create", "Thread", "Thread");
+}
+
+Thread::Thread(Thread::startFn_t sf,
+ void* p,
+ const std::string& id) :
+ m_id(id),
+ m_running(true)
+{
+ PTHREAD_CHK(::pthread_create(&m_thread, NULL, sf, p), "::pthread_create", "Thread", "Thread");
+}
+
+Thread::~Thread()
+{
+ if (m_running) {
+ PTHREAD_CHK(::pthread_detach(m_thread), "pthread_detach", "~Thread", "Thread");
+ }
+}
+
+const std::string&
+Thread::getId() const
+{
+ return m_id;
+}
+
+void Thread::join()
+{
+ PTHREAD_CHK(::pthread_join(m_thread, NULL), "pthread_join", "join", "Thread");
+ m_running = false;
+}
+
+}}} // namespace tests::storePerftools::common
diff --git a/cpp/src/tests/storePerfTools/common/Thread.h b/cpp/src/tests/storePerfTools/common/Thread.h
new file mode 100644
index 0000000000..505d038162
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/common/Thread.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Thread.h
+ */
+
+#ifndef tests_storePerfTools_common_Thread_h_
+#define tests_storePerfTools_common_Thread_h_
+
+#include <pthread.h>
+#include <string>
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+
+/**
+ * \brief Ultra-simple pthread class.
+ */
+class Thread {
+public:
+ typedef void*(*startFn_t)(void*); ///< Thread entry point function pointer type
+
+ /**
+ * \brief Constructor
+ * \param sf Pointer to thread entry function
+ * \param p Void pointer to parameter of start function
+ */
+ Thread(startFn_t sf,
+ void* p);
+
+ /**
+ * \brief Constructor
+ * \param sf Pointer to thread entry function
+ * \param p Void pointer to parameter of start function
+ * \param id Name of this thread instance
+ */
+ Thread(startFn_t sf,
+ void* p,
+ const std::string& id);
+
+ /**
+ * \brief Destructor
+ */
+ virtual ~Thread();
+
+ /**
+ * \brief Get the name of this thread.
+ * \return Name as supplied to the constructor.
+ */
+ const std::string& getId() const;
+
+ /**
+ * \brief Wait for this thread instance to finish running startFn().
+ */
+ void join();
+
+private:
+ ::pthread_t m_thread; ///< Internal posix thread
+ std::string m_id; ///< Identifier for this thread instance
+ bool m_running; ///< \b true is the thread is active and running, \b false when not yet started or joined.
+
+};
+
+}}} // namespace tests::storePerftools::common
+
+#endif // tests_storePerfTools_common_Thread_h_
diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp b/cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp
new file mode 100644
index 0000000000..6efdc06fc8
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Journal.cpp
+ */
+
+#include "Journal.h"
+
+#ifdef JOURNAL2
+# include "qpid/asyncStore/jrnl2/DataToken.h"
+
+# define X_JRNL_FN_DEQUEUE(dtok) dequeue(dtok, 0, 0)
+# define X_JRNL_FN_ENQUEUE(dtok, msgData, msgSize) enqueue(dtok, msgData, msgSize, 0, 0, false)
+# define X_JRNL_FN_FLUSH(jrnlPtr) { jrnlPtr->flush(); jrnlPtr->sync(); }
+# define X_JRNL_FN_GETDTOKSTATUS(dtok) dtok
+# define X_JRNL_FN_GETEVENTS(timeout) processCompletedAioWriteEvents(timeout)
+# define X_JRNL_FN_GETIOSTR(iores) qpid::asyncStore::jrnl2::g_ioResAsString(iores)
+# define X_JRNL_IO_OP_RES qpid::asyncStore::jrnl2::jrnlOpRes
+# define X_JRNL_IO_OP_RES_BUSY qpid::asyncStore::jrnl2::RHM_IORES_BUSY
+# define X_JRNL_IO_OP_RES_ENQCAPTHRESH qpid::asyncStore::jrnl2::RHM_IORES_ENQCAPTHRESH
+# define X_JRNL_IO_OP_RES_SUCCESS 0
+# define X_SCOPED_LOCK qpid::asyncStore::jrnl2::ScopedLock
+#else
+# include "jrnl/jcntl.hpp"
+# include "jrnl/data_tok.hpp"
+
+# define X_JRNL_FN_DEQUEUE(dtok) dequeue_data_record(dtok)
+# define X_JRNL_FN_ENQUEUE(dtok, msgData, msgSize) enqueue_data_record(msgData, msgSize, msgSize, dtok);
+# define X_JRNL_FN_FLUSH(jrnlPtr) jrnlPtr->flush(true)
+# define X_JRNL_FN_GETDTOKSTATUS(dtok) dtok->status_str()
+# define X_JRNL_FN_GETEVENTS(timeout) get_wr_events(timeout)
+# define X_JRNL_FN_GETIOSTR(iores) mrg::journal::iores_str(iores)
+# define X_JRNL_IO_OP_RES mrg::journal::iores
+# define X_JRNL_IO_OP_RES_BUSY mrg::journal::RHM_IORES_BUSY
+# define X_JRNL_IO_OP_RES_ENQCAPTHRESH mrg::journal::RHM_IORES_ENQCAPTHRESH
+# define X_JRNL_IO_OP_RES_SUCCESS mrg::journal::RHM_IORES_SUCCESS
+# define X_SCOPED_LOCK mrg::journal::slock
+#endif
+
+#include <iostream>
+
+namespace tests {
+namespace storePerftools {
+namespace jrnlPerf {
+
+Journal::Journal(const uint32_t numMsgs,
+ const uint32_t msgSize,
+ const char* msgData,
+ X_ASYNC_JOURNAL* const jrnlPtr) :
+ m_numMsgs(numMsgs),
+ m_msgSize(msgSize),
+ m_msgData(msgData),
+ m_jrnlPtr(jrnlPtr)
+{}
+
+Journal::~Journal()
+{
+ delete m_jrnlPtr;
+}
+
+
+// *** MUST BE THREAD-SAFE ****
+// This method will be called by multiple threads simultaneously
+// Enqueue thread entry point
+void*
+Journal::runEnqueues()
+{
+ bool misfireFlag = false;
+ uint32_t i = 0;
+ while (i < m_numMsgs) {
+ X_DATA_TOKEN* mtokPtr = new X_DATA_TOKEN();
+ X_JRNL_IO_OP_RES jrnlIoRes = m_jrnlPtr->X_JRNL_FN_ENQUEUE(mtokPtr, m_msgData, m_msgSize);
+ switch (jrnlIoRes) {
+ case X_JRNL_IO_OP_RES_SUCCESS:
+ i++;
+ misfireFlag = false;
+ break;
+ case X_JRNL_IO_OP_RES_BUSY:
+ if (!misfireFlag) {
+ std::cout << "-" << std::flush;
+ }
+ delete mtokPtr;
+ misfireFlag = true;
+ break;
+ case X_JRNL_IO_OP_RES_ENQCAPTHRESH:
+ if (!misfireFlag) {
+ std::cout << "*" << std::flush;
+ }
+ delete mtokPtr;
+ misfireFlag = true;
+ ::usleep(10);
+ break;
+ default:
+ delete mtokPtr;
+ std::cerr << "enqueue_data_record FAILED with " << X_JRNL_FN_GETIOSTR(jrnlIoRes) << std::endl;
+ }
+ }
+ /// \todo handle these results
+ X_JRNL_FN_FLUSH(m_jrnlPtr);
+ return NULL;
+}
+
+
+// *** MUST BE THREAD-SAFE ****
+// This method will be called by multiple threads simultaneously
+// Dequeue thread entry point
+void*
+Journal::runDequeues()
+{
+ uint32_t i = 0;
+ X_JRNL_IO_OP_RES jrnlIoRes;
+ while (i < m_numMsgs) {
+ X_DATA_TOKEN* mtokPtr = 0;
+ while (!mtokPtr) {
+ bool procAioEventsFlag;
+ { // --- START OF CRITICAL SECTION ---
+ X_SCOPED_LOCK l(m_unprocCallbacksMutex);
+ procAioEventsFlag = m_unprocCallbacks.empty();
+ if (!procAioEventsFlag) {
+ mtokPtr = m_unprocCallbacks.back();
+ m_unprocCallbacks.pop_back();
+ }
+ } // --- END OF CRITICAL SECTION ---
+ if (procAioEventsFlag) {
+ m_jrnlPtr->X_JRNL_FN_GETEVENTS(0);
+ ::usleep(1);
+ }
+ }
+ bool done = false;
+ while (!done) {
+ jrnlIoRes = m_jrnlPtr->X_JRNL_FN_DEQUEUE(mtokPtr);
+ switch (jrnlIoRes) {
+ case X_JRNL_IO_OP_RES_SUCCESS:
+ i ++;
+ done = true;
+ break;
+ case X_JRNL_IO_OP_RES_BUSY:
+ //::usleep(10);
+ break;
+ default:
+ std::cerr << "dequeue_data_record FAILED with " << X_JRNL_FN_GETIOSTR(jrnlIoRes) << ": "
+ << X_JRNL_FN_GETDTOKSTATUS(mtokPtr) << std::endl;
+ delete mtokPtr;
+ done = true;
+ }
+ }
+ m_jrnlPtr->X_JRNL_FN_GETEVENTS(0);
+ }
+ /// \todo handle these results
+ X_JRNL_FN_FLUSH(m_jrnlPtr);
+ return NULL;
+}
+
+//static
+void*
+Journal::startEnqueues(void* ptr)
+{
+ return reinterpret_cast<Journal*>(ptr)->runEnqueues();
+}
+
+//static
+void*
+Journal:: startDequeues(void* ptr)
+{
+ return reinterpret_cast<Journal*>(ptr)->runDequeues();
+}
+
+// *** MUST BE THREAD-SAFE ****
+// This method will be called by multiple threads simultaneously
+void
+Journal::X_AIO_WR_CALLBACK(std::vector<X_DATA_TOKEN*>& msgTokenList)
+{
+ X_DATA_TOKEN* mtokPtr;
+ while (msgTokenList.size()) {
+ mtokPtr = msgTokenList.back();
+ msgTokenList.pop_back();
+#ifdef JOURNAL2
+ switch (mtokPtr->getDataOpState().get()) {
+ case qpid::asyncStore::jrnl2::OP_ENQUEUE:
+#else
+ switch (mtokPtr->wstate()) {
+ case X_DATA_TOKEN::ENQ:
+#endif
+ { // --- START OF CRITICAL SECTION ---
+ X_SCOPED_LOCK l(m_unprocCallbacksMutex);
+ m_unprocCallbacks.push_back(mtokPtr);
+ } // --- END OF CRITICAL SECTION ---
+ break;
+ default:
+ delete mtokPtr;
+ }
+ }
+}
+
+// *** MUST BE THREAD-SAFE ****
+// This method will be called by multiple threads simultaneously
+void
+Journal::X_AIO_RD_CALLBACK(std::vector<uint16_t>& /*buffPageCtrlBlkIndexList*/)
+{}
+
+}}} // namespace tests::storePerftools::jrnlPerf
diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/Journal.h b/cpp/src/tests/storePerfTools/jrnlPerf/Journal.h
new file mode 100644
index 0000000000..19d0980b53
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/jrnlPerf/Journal.h
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Journal.h
+ */
+
+#ifndef tests_storePerfTools_jrnlPerf_Journal_h_
+#define tests_storePerfTools_jrnlPerf_Journal_h_
+
+#ifdef JOURNAL2
+# include "qpid/asyncStore/jrnl2/AioCallback.h"
+# include "qpid/asyncStore/jrnl2/AsyncJournal.h"
+# include "qpid/asyncStore/jrnl2/ScopedLock.h"
+#else
+# include "jrnl/aio_callback.hpp"
+# include "jrnl/smutex.hpp"
+#endif
+
+#include <stdint.h> // uint16_t, uint32_t
+
+#ifdef JOURNAL2
+# define X_AIO_CALLBACK qpid::asyncStore::jrnl2::AioCallback
+# define X_AIO_RD_CALLBACK readAioCompleteCallback
+# define X_AIO_WR_CALLBACK writeAioCompleteCallback
+# define X_ASYNC_JOURNAL qpid::asyncStore::jrnl2::AsyncJournal
+# define X_DATA_TOKEN qpid::asyncStore::jrnl2::DataToken
+# define X_SCOPED_MUTEX qpid::asyncStore::jrnl2::ScopedMutex
+#else
+# define X_AIO_CALLBACK mrg::journal::aio_callback
+# define X_AIO_RD_CALLBACK rd_aio_cb
+# define X_AIO_WR_CALLBACK wr_aio_cb
+# define X_ASYNC_JOURNAL mrg::journal::jcntl
+# define X_DATA_TOKEN mrg::journal::data_tok
+# define X_SCOPED_MUTEX mrg::journal::smutex
+#endif
+
+#ifndef JOURNAL2
+namespace mrg {
+namespace journal {
+class jcntl;
+}} // namespace mrg::journal
+namespace qpid {
+namespace asyncStore {
+namespace jrnl2 {
+class AsyncJournal;
+}}} // namespace qpid::asyncStore::jrnl2
+#endif
+
+namespace tests {
+namespace storePerftools {
+namespace jrnlPerf {
+
+/**
+ * \brief Test journal instance. Each queue to be tested results in one instance of this class.
+ *
+ * Journal test harness which contains the journal to be tested. Each queue to be tested in the test parameters
+ * results in one instance of this class being instantiated, and consequently one set of journals on disk. The
+ * journal instance is provided as a pointer to the constructor.
+ */
+class Journal : public X_AIO_CALLBACK
+{
+public:
+ /**
+ * \brief Constructor
+ *
+ * \param numMsgs Number of messages per thread to be enqueued then dequeued (ie both ways through broker)
+ * \param msgSize Size of each message being enqueued
+ * \param msgData Pointer to message content (all messages have identical content)
+ * \param jrnlPtr Pinter to journal instance which is to be tested
+ */
+ Journal(const uint32_t numMsgs,
+ const uint32_t msgSize,
+ const char* msgData,
+ X_ASYNC_JOURNAL* const jrnlPtr);
+
+ /**
+ * \brief virtual destructor
+ */
+ virtual ~Journal();
+
+ /**
+ * \brief Worker thread enqueue task
+ *
+ * This function is the worker thread enqueue task entry point. It enqueues _numMsgs onto the journal instance.
+ * A data tokens is created for each record, this is the start of the data token life cycle. All possible
+ * returns from the journal are handled appropriately. Since the enqueue threads also perform
+ * callbacks on completed AIO operations, the data tokens from completed enqueues are placed onto the
+ * unprocessed callback list (_unprocCallbackList) for dequeueing by the dequeue worker thread(s).
+ *
+ * This function must be thread safe.
+ */
+ void* runEnqueues();
+
+ /**
+ * \brief Worker thread dequeue task
+ *
+ * This function is the worker thread dequeue task entry point. It dequeues messages which are on the
+ * unprocessed callback list (_unprocCallbackList).
+ *
+ * This function must be thread safe.
+ */
+ void* runDequeues();
+
+ /**
+ * \brief Helper function to launch the run() function when starting a thread.
+ */
+ static void* startEnqueues(void* ptr);
+
+ /**
+ * \brief Helper function to launch the run() function when starting a thread.
+ */
+ static void* startDequeues(void* ptr);
+
+ /**
+ * \brief Write callback function. When AIO operations return, this function is called.
+ *
+ * When AIO operations return, this function will sort the enqueue ops from the rest and place the data tokens
+ * of these records onto the unprocessed callback list (_unprocCallbackList) for dequeueing by another thread.
+ *
+ * Returning dequeue ops have their data tokens destroyed, as this is the end of the life cycle of the data
+ * tokens.
+ *
+ * Required by all subclasses of mrg::journal::aio_callback.
+ *
+ * \param dataTokenList A vector of data tokens for those messages which have completed their AIO write
+ * operations
+ */
+ void X_AIO_WR_CALLBACK(std::vector<X_DATA_TOKEN*>& dataTokenList);
+
+ /**
+ * \brief Read callback function. When read AIO operations return, this function is called.
+ *
+ * Not used in this test, but required by all subclasses of mrg::journal::aio_callback.
+ *
+ * \param buffPageCtrlBlkIndexList A vector of indices to the buffer page control blocks for completed reads
+ */
+ void X_AIO_RD_CALLBACK(std::vector<uint16_t>& buffPageCtrlBlkIndexList);
+
+protected:
+ const uint32_t m_numMsgs; ///< Number of messages to be processed by this journal instance
+ const uint32_t m_msgSize; ///< Size of each message (in bytes)
+ const char* m_msgData; ///< Pointer to message content to be used for each message.
+ X_ASYNC_JOURNAL* const m_jrnlPtr; ///< Journal instance pointer
+ std::vector<X_DATA_TOKEN*> m_unprocCallbacks; ///< List of unprocessed callbacks to be dequeued
+ X_SCOPED_MUTEX m_unprocCallbacksMutex; ///< Mutex which protects the unprocessed callback queue
+
+
+};
+
+}}} // namespace tests::storePerftools::jrnlPerf
+
+#endif // tests_storePerfTools_jrnlPerf_Journal_h_
diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.cpp b/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.cpp
new file mode 100644
index 0000000000..2b07619041
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.cpp
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file JournalParameters.cpp
+ */
+
+#include "JournalParameters.h"
+
+#include <cstdlib> // std::atof, std::atoi, std::atol
+
+namespace tests {
+namespace storePerftools {
+namespace jrnlPerf {
+
+#ifndef JOURNAL2
+// static declarations - for jrnl2, these are inherited
+std::string JournalParameters::s_defaultJrnlDir = "/tmp/store";
+std::string JournalParameters::s_defaultJrnlBaseFileName = "JournalData";
+uint16_t JournalParameters::s_defaultNumJrnlFiles = 8;
+uint32_t JournalParameters::s_defaultJrnlFileSize_sblks = 3072;
+uint16_t JournalParameters::s_defaultWriteBuffNumPgs = 32;
+uint32_t JournalParameters::s_defaultWriteBuffPgSize_sblks = 128;
+#endif
+
+JournalParameters::JournalParameters() :
+#ifdef JOURNAL2
+ qpid::asyncStore::jrnl2::JournalParameters()
+#else
+ Parameters(),
+ m_jrnlDir(s_defaultJrnlDir),
+ m_jrnlBaseFileName(s_defaultJrnlBaseFileName),
+ m_numJrnlFiles(s_defaultNumJrnlFiles),
+ m_jrnlFileSize_sblks(s_defaultJrnlFileSize_sblks),
+ m_writeBuffNumPgs(s_defaultWriteBuffNumPgs),
+ m_writeBuffPgSize_sblks(s_defaultWriteBuffPgSize_sblks)
+#endif
+{}
+
+JournalParameters::JournalParameters(const std::string& jrnlDir,
+ const std::string& jrnlBaseFileName,
+ const uint16_t numJrnlFiles,
+ const uint32_t jrnlFileSize_sblks,
+ const uint16_t writeBuffNumPgs,
+ const uint32_t writeBuffPgSize_sblks) :
+#ifdef JOURNAL2
+ qpid::asyncStore::jrnl2::JournalParameters(jrnlDir, jrnlBaseFileName, numJrnlFiles, jrnlFileSize_sblks, writeBuffNumPgs,
+ writeBuffPgSize_sblks)
+#else
+ Parameters(),
+ m_jrnlDir(jrnlDir),
+ m_jrnlBaseFileName(jrnlBaseFileName),
+ m_numJrnlFiles(numJrnlFiles),
+ m_jrnlFileSize_sblks(jrnlFileSize_sblks),
+ m_writeBuffNumPgs(writeBuffNumPgs),
+ m_writeBuffPgSize_sblks(writeBuffPgSize_sblks)
+#endif
+{}
+
+#ifdef JOURNAL2
+JournalParameters::JournalParameters(const qpid::asyncStore::jrnl2::JournalParameters& jp) :
+ qpid::asyncStore::jrnl2::JournalParameters(jp)
+{}
+#endif
+
+JournalParameters::JournalParameters(const JournalParameters& jp) :
+#ifdef JOURNAL2
+ qpid::asyncStore::jrnl2::JournalParameters(jp)
+#else
+ Parameters(),
+ m_jrnlDir(jp.m_jrnlDir),
+ m_jrnlBaseFileName(jp.m_jrnlBaseFileName),
+ m_numJrnlFiles(jp.m_numJrnlFiles),
+ m_jrnlFileSize_sblks(jp.m_jrnlFileSize_sblks),
+ m_writeBuffNumPgs(jp.m_writeBuffNumPgs),
+ m_writeBuffPgSize_sblks(jp.m_writeBuffPgSize_sblks)
+#endif
+{}
+
+JournalParameters::~JournalParameters()
+{}
+
+void
+JournalParameters::toStream(std::ostream& os) const
+{
+ os << "Journal Parameters:" << std::endl;
+ os << " jrnlDir = \"" << m_jrnlDir << "\"" << std::endl;
+ os << " jrnlBaseFileName = \"" << m_jrnlBaseFileName << "\"" << std::endl;
+ os << " numJrnlFiles = " << m_numJrnlFiles << std::endl;
+ os << " jrnlFileSize_sblks = " << m_jrnlFileSize_sblks << std::endl;
+ os << " writeBuffNumPgs = " << m_writeBuffNumPgs << std::endl;
+ os << " writeBuffPgSize_sblks = " << m_writeBuffPgSize_sblks << std::endl;
+}
+
+bool
+JournalParameters::parseArg(const int arg,
+ const char* optarg)
+{
+ switch(arg) {
+ case 'j':
+ m_jrnlDir.assign(optarg);
+ break;
+ case 'b':
+ m_jrnlBaseFileName.assign(optarg);
+ break;
+ case 'f':
+ m_numJrnlFiles = uint16_t(std::atoi(optarg));
+ break;
+ case 's':
+ m_jrnlFileSize_sblks = uint32_t(std::atol(optarg));
+ break;
+ case 'p':
+ m_writeBuffNumPgs = uint16_t(std::atoi(optarg));
+ break;
+ case 'c':
+ m_writeBuffPgSize_sblks = uint32_t(std::atol(optarg));
+ break;
+ default:
+ return false;
+ }
+ return true;
+}
+
+// static
+void
+JournalParameters::printArgs(std::ostream& os)
+{
+ os << "Journal parameters:" << std::endl;
+ os << " -j --jrnl_dir: Store directory [\""
+ << JournalParameters::s_defaultJrnlDir << "\"]" << std::endl;
+ os << " -b --jrnl_base_filename: Base name for journal files [\""
+ << JournalParameters::s_defaultJrnlBaseFileName << "\"]" << std::endl;
+ os << " -f --num_jfiles: Number of journal files ["
+ << JournalParameters::s_defaultNumJrnlFiles << "]" << std::endl;
+ os << " -s --jfsize_sblks: Size of each journal file in sblks (512 byte blocks) ["
+ << JournalParameters::s_defaultJrnlFileSize_sblks << "]" << std::endl;
+ os << " -p --wcache_num_pages: Number of write buffer pages ["
+ << JournalParameters::s_defaultWriteBuffNumPgs << "]" << std::endl;
+ os << " -c --wcache_pgsize_sblks: Size of each write buffer page in sblks (512 byte blocks) ["
+ << JournalParameters::s_defaultWriteBuffPgSize_sblks << "]" << std::endl;
+}
+
+// static
+std::string
+JournalParameters::shortArgs()
+{
+ return "j:b:f:s:p:c:";
+}
+
+}}} // namespace tests::storePerftools::jrnlPerf
diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.h b/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.h
new file mode 100644
index 0000000000..ab7f864a91
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.h
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file JournalParameters.h
+ */
+
+#ifndef tests_storePerfTools_jrnlPerf_JournalParameters_h_
+#define tests_storePerfTools_jrnlPerf_JournalParameters_h_
+
+#include "tests/storePerfTools/common/Parameters.h"
+
+#ifdef JOURNAL2
+# include "qpid/asyncStore/jrnl2/JournalParameters.h"
+#endif
+
+#include <stdint.h> // uint16_6, uint32_t
+
+namespace tests {
+namespace storePerftools {
+namespace jrnlPerf {
+
+/**
+ * \brief Stuct for aggregating the common journal parameters
+ *
+ * This struct is used to aggregate and keep together all the common journal parameters. These affect the journal
+ * geometry and buffers. The test parameters are aggregated in class JrnlPerfTestParameters.
+ */
+class JournalParameters :
+#ifdef JOURNAL2
+ public qpid::asyncStore::jrnl2::JournalParameters,
+#endif
+ public tests::storePerftools::common::Parameters
+{
+public:
+#ifndef JOURNAL2
+ // static default store params
+ static std::string s_defaultJrnlDir; ///< Default journal directory
+ static std::string s_defaultJrnlBaseFileName; ///< Default journal base file name
+ static uint16_t s_defaultNumJrnlFiles; ///< Default number of journal data files
+ static uint32_t s_defaultJrnlFileSize_sblks; ///< Default journal data file size in softblocks
+ static uint16_t s_defaultWriteBuffNumPgs; ///< Default number of write buffer pages
+ static uint32_t s_defaultWriteBuffPgSize_sblks; ///< Default size of each write buffer page in softblocks
+
+ std::string m_jrnlDir; ///< Journal directory
+ std::string m_jrnlBaseFileName; ///< Journal base file name
+ uint16_t m_numJrnlFiles; ///< Number of journal data files
+ uint32_t m_jrnlFileSize_sblks; ///< Journal data file size in softblocks
+ uint16_t m_writeBuffNumPgs; ///< Number of write buffer pages
+ uint32_t m_writeBuffPgSize_sblks; ///< Size of each write buffer page in softblocks
+#endif
+
+ /**
+ * \brief Default constructor
+ *
+ * Default constructor. Uses the default values for all parameters.
+ */
+ JournalParameters();
+
+ /**
+ * \brief Constructor
+ *
+ * Convenience constructor.
+ *
+ * \param jrnlDir Journal directory
+ * \param jrnlBaseFileName Journal base file name
+ * \param numJrnlFiles Number of journal data files
+ * \param jrnlFileSize_sblks Journal data file size in softblocks
+ * \param writeBuffNumPgs Number of write buffer pages
+ * \param writeBuffPgSize_sblks Size of each write buffer page in softblocks
+ */
+ JournalParameters(const std::string& jrnlDir,
+ const std::string& jrnlBaseFileName,
+ const uint16_t numJrnlFiles,
+ const uint32_t jrnlFileSize_sblks,
+ const uint16_t writeBuffNumPgs,
+ const uint32_t writeBuffPgSize_sblks);
+
+ /**
+ * \brief Copy constructor
+ *
+ * \param jp Reference to JrnlParameters instance to be copied
+ */
+#ifdef JOURNAL2
+ JournalParameters(const qpid::asyncStore::jrnl2::JournalParameters& jp);
+#endif
+ JournalParameters(const JournalParameters& jp);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~JournalParameters();
+
+ virtual bool parseArg(const int arg,
+ const char* optarg);
+
+ static void printArgs(std::ostream& os);
+
+ static std::string shortArgs();
+
+ /***
+ * \brief Stream the journal parameters to an output stream
+ *
+ * Convenience feature which streams a multi-line representation of all the journal parameters, one per line to
+ * an output stream.
+ *
+ * \param os Output stream to which the class data is to be streamed
+ */
+ void toStream(std::ostream& os = std::cout) const;
+
+};
+
+}}} // namespace tests::storePerftools::jrnlPerf
+
+#endif // tests_storePerfTools_jrnlPerf_JournalParameters_h_
diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.cpp b/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.cpp
new file mode 100644
index 0000000000..17a9c9b6cb
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.cpp
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file PerfTest.cpp
+ */
+
+#include "PerfTest.h"
+
+#include "Journal.h"
+#include "JournalParameters.h"
+
+#include "tests/storePerfTools/version.h"
+#include "tests/storePerfTools/common/ScopedTimer.h"
+#include "tests/storePerfTools/common/TestParameters.h"
+#include "tests/storePerfTools/common/Thread.h"
+
+#ifdef JOURNAL2
+# include "qpid/asyncStore/jrnl2/AsyncJournal.h"
+# include "qpid/asyncStore/jrnl2/JournalDirectory.h"
+#else
+# include "jrnl/jcntl.hpp"
+# include "jrnl/jdir.hpp"
+#endif
+
+#include <deque>
+#include <getopt.h> // getopt_long(), required_argument, no_argument
+#include <iomanip> // std::setw() std::setfill()
+#include <sstream> // std::ostringstream
+#include <stdint.h> // uint16_t, uint32_t
+
+#ifdef ECLIPSE_CDT_ANNOYANCE // This prevents problems with Eclipse CODAN, which can't see this in getopt.h
+ struct option;
+ extern int getopt_long (int, char *const *, const char *, const struct option *, int *) __THROW;
+# define no_argument 0
+# define required_argument 1
+# define optional_argument 2
+#endif
+
+namespace tests {
+namespace storePerftools {
+namespace jrnlPerf {
+
+PerfTest::PerfTest(const tests::storePerftools::common::TestParameters& tp,
+ const JournalParameters& jp) :
+ Streamable(),
+ m_testParams(tp),
+ m_jrnlParams(jp),
+ m_testResult(tp),
+ m_msgData(new char[tp.m_msgSize])
+{}
+
+PerfTest::~PerfTest()
+{
+ delete[] m_msgData;
+}
+
+void
+PerfTest::prepareJournals(std::vector<Journal*>& jrnlList)
+{
+#ifdef JOURNAL2
+ if (qpid::asyncStore::jrnl2::JournalDirectory::s_exists(m_jrnlParams.m_jrnlDir)) {
+ qpid::asyncStore::jrnl2::JournalDirectory::s_destroy(m_jrnlParams.m_jrnlDir);
+ }
+ qpid::asyncStore::jrnl2::JournalDirectory::s_create(m_jrnlParams.m_jrnlDir);
+ qpid::asyncStore::jrnl2::AsyncJournal* jp;
+#else
+ if (mrg::journal::jdir::exists(m_jrnlParams.m_jrnlDir)) {
+ mrg::journal::jdir::delete_dir(m_jrnlParams.m_jrnlDir);
+ }
+ mrg::journal::jdir::create_dir(m_jrnlParams.m_jrnlDir);
+ mrg::journal::jcntl* jp;
+#endif
+ Journal* ptp;
+ for (uint16_t j = 0; j < m_testParams.m_numQueues; j++) {
+ std::ostringstream jname;
+ jname << "jrnl_" << std::setw(4) << std::setfill('0') << j;
+ std::ostringstream jdir;
+ jdir << m_jrnlParams.m_jrnlDir << "/" << jname.str();
+#ifdef JOURNAL2
+ jp = new qpid::asyncStore::jrnl2::AsyncJournal(jname.str(), jdir.str(), m_jrnlParams.m_jrnlBaseFileName);
+#else
+ jp = new mrg::journal::jcntl(jname.str(), jdir.str(), m_jrnlParams.m_jrnlBaseFileName);
+#endif
+ ptp = new Journal(m_testParams.m_numMsgs, m_testParams.m_msgSize, m_msgData, jp);
+#ifdef JOURNAL2
+ jp->initialize(&m_jrnlParams, ptp);
+#else
+ jp->initialize(m_jrnlParams.m_numJrnlFiles, false, m_jrnlParams.m_numJrnlFiles,
+ m_jrnlParams.m_jrnlFileSize_sblks, m_jrnlParams.m_writeBuffNumPgs,
+ m_jrnlParams.m_writeBuffPgSize_sblks, ptp);
+#endif
+
+ jrnlList.push_back(ptp);
+ }
+}
+
+void
+PerfTest::destroyJournals(std::vector<Journal*>& jrnlList)
+{
+ while (jrnlList.size()) {
+ delete jrnlList.back();
+ jrnlList.pop_back();
+ }
+}
+
+void
+PerfTest::run()
+{
+ std::vector<Journal*> jrnlList;
+ prepareJournals(jrnlList);
+
+ std::deque<tests::storePerftools::common::Thread*> threads;
+ tests::storePerftools::common::Thread* tp;
+ { // --- Start of timed section ---
+ tests::storePerftools::common::ScopedTimer st(m_testResult);
+
+ for (uint16_t q = 0; q < m_testParams.m_numQueues; q++) {
+ for (uint16_t t = 0; t < m_testParams.m_numEnqThreadsPerQueue; t++) {
+ tp = new tests::storePerftools::common::Thread(jrnlList[q]->startEnqueues, reinterpret_cast<void*>(jrnlList[q]));
+ threads.push_back(tp);
+ }
+ for (uint16_t dt = 0; dt < m_testParams.m_numDeqThreadsPerQueue; ++dt) {
+ tp = new tests::storePerftools::common::Thread(jrnlList[q]->startDequeues, reinterpret_cast<void*>(jrnlList[q]));
+ threads.push_back(tp);
+ }
+ }
+
+ while (threads.size()) {
+ threads.front()->join();
+ delete threads.front();
+ threads.pop_front();
+ }
+ } // --- End of timed section ---
+ destroyJournals(jrnlList);
+}
+
+void
+PerfTest::toStream(std::ostream& os) const
+{
+ os << m_testParams << std::endl;
+ os << m_jrnlParams << std::endl;
+ os << m_testResult << std::endl;
+}
+
+void
+printArgs(std::ostream& os)
+{
+ os << " -h --help: This help message" << std::endl;
+ os << std::endl;
+
+ tests::storePerftools::common::TestParameters::printArgs(os);
+ os << std::endl;
+
+ JournalParameters::printArgs(os);
+ os << std::endl;
+}
+
+bool
+readArgs(int argc,
+ char** argv,
+ tests::storePerftools::common::TestParameters& tp,
+ JournalParameters& jp)
+{
+ /// \todo TODO: At some point, find an easy way to aggregate these from JrnlPerfTestParameters and JrnlParameters themselves.
+ static struct option long_options[] = {
+ {"help", no_argument, 0, 'h'},
+ {"version", no_argument, 0, 'v'},
+
+ // Test params
+ {"num_msgs", required_argument, 0, 'm'},
+ {"msg_size", required_argument, 0, 'S'},
+ {"num_queues", required_argument, 0, 'q'},
+ {"num_enq_threads_per_queue", required_argument, 0, 'e'},
+ {"num_deq_threads_per_queue", required_argument, 0, 'd'},
+
+ // Journal params
+ {"jrnl_dir", required_argument, 0, 'j'},
+ {"jrnl_base_filename", required_argument, 0, 'b'},
+ {"num_jfiles", required_argument, 0, 'f'},
+ {"jfsize_sblks", required_argument, 0, 's'},
+ {"wcache_num_pages", required_argument, 0, 'p'},
+ {"wcache_pgsize_sblks", required_argument, 0, 'c'},
+
+ {0, 0, 0, 0}
+ };
+
+ bool err = false;
+ bool ver = false;
+ int c = 0;
+ while (true) {
+ int option_index = 0;
+ std::ostringstream oss;
+ oss << "hv" << tests::storePerftools::common::TestParameters::shortArgs() << JournalParameters::shortArgs();
+ c = getopt_long(argc, argv, oss.str().c_str(), long_options, &option_index);
+ if (c == -1) break;
+ if (c == 'v') {
+ std::cout << tests::storePerftools::name() << " v." << tests::storePerftools::version() << std::endl;
+ ver = true;
+ break;
+ }
+ err = !(tp.parseArg(c, optarg) || jp.parseArg(c, optarg));
+ }
+ if (err) {
+ std::cout << std::endl;
+ printArgs();
+ }
+ return err || ver;
+}
+
+}}} // namespace tests::storePerftools::jrnlPerf
+
+// -----------------------------------------------------------------
+
+int
+main(int argc, char** argv)
+{
+ tests::storePerftools::common::TestParameters tp;
+ tests::storePerftools::jrnlPerf::JournalParameters jp;
+ if (tests::storePerftools::jrnlPerf::readArgs(argc, argv, tp, jp)) return 1;
+ tests::storePerftools::jrnlPerf::PerfTest jpt(tp, jp);
+ jpt.run();
+ std::cout << jpt << std::endl;
+ return 0;
+}
diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.h b/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.h
new file mode 100644
index 0000000000..b105bc0488
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.h
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file PerfTest.h
+ */
+
+#ifndef tests_storePerfTools_jrnlPerf_PerfTest_h_
+#define tests_storePerfTools_jrnlPerf_PerfTest_h_
+
+#include "TestResult.h"
+#include "tests/storePerfTools/common/Streamable.h"
+
+#include <vector>
+
+namespace tests {
+namespace storePerftools {
+namespace common {
+class TestParameters;
+}
+namespace jrnlPerf {
+
+class Journal;
+class JournalParameters;
+
+/**
+ * \brief Main test class; Create an instance and execute run()
+ *
+ * Main test class which aggregates the components of a test.
+ */
+class PerfTest : public tests::storePerftools::common::Streamable
+{
+public:
+ /**
+ * \brief Constructor
+ *
+ * \param tp Test parameters for the test
+ * \param jp Journal parameters for all queues (journals) in the test
+ */
+ PerfTest(const tests::storePerftools::common::TestParameters& tp,
+ const JournalParameters& jp);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~PerfTest();
+
+ /**
+ * \brief Runs the test and prints out the results.
+ *
+ * Runs the test as set by the test parameters and journal parameters.
+ */
+ void run();
+
+ /**
+ * \brief Stream the test setup and results to an output stream
+ *
+ * Convenience feature which streams the test setup and results to an output stream.
+ *
+ * \param os Output stream to which the test setup and results are to be streamed.
+ */
+ void toStream(std::ostream& os = std::cout) const;
+
+protected:
+ const tests::storePerftools::common::TestParameters& m_testParams; ///< Ref to a struct containing test params
+ const JournalParameters& m_jrnlParams; ///< Ref to a struct containing the journal parameters
+ TestResult m_testResult; ///< Journal performance object
+ const char* m_msgData; ///< Pointer to msg data, which is the same for all messages
+
+ /**
+ * \brief Creates journals and JrnlInstance classes for all journals (queues) to be tested
+ *
+ * Creates a new journal instance and JrnlInstance instance for each queue. The journals are initialized
+ * which creates a new set of journal files on the local storage media (which is determined by path in
+ * JrnlParameters._jrnlDir). This activity is not timed, and is not a part of the performance test per se.
+ *
+ * \param jrnlList List which will be filled with pointers to the newly prepared journals
+ */
+ void prepareJournals(std::vector<Journal*>& jrnlList);
+
+ /**
+ * \brief Destroy the journal instances in list jrnlList
+ *
+ * \param jrnlList List of pointers to journals to be destroyed
+ */
+ void destroyJournals(std::vector<Journal*>& jrnlList);
+
+};
+
+/**
+ * \brief Print out the program arguments
+ *
+ * Print out the arguments to the performance program if requested by help or a parameter error.
+ *
+ * \param os Stream to which the arguments should be streamed.
+ */
+void printArgs(std::ostream& os = std::cout);
+
+/**
+ * \brief Process the command-line arguments
+ *
+ * Process the command-line arguments and populate the JrnlPerfTestParameters and JrnlParameters structs. Only the
+ * arguments supplied are on the command-line are changed in these structs, the others remain unchanged. It is
+ * important therefore to make sure that defaults are pre-loaded (the default behavior of the default constructors
+ * for these structs).
+ *
+ * \param argc Number of command-line arguments. Process directly from main().
+ * \param argv Pointer to array of command-line argument pointers. Process directly from main().
+ * \param tp Reference to test parameter object. Only params on the command-line are changed.
+ * \param jp Reference to journal parameter object. Only params on the command-line are changed.
+ */
+bool readArgs(int argc,
+ char** argv,
+ tests::storePerftools::common::TestParameters& tp,
+ JournalParameters& jp);
+
+}}} // namespace tests::storePerftools::jrnlPerf
+
+#endif // tests_storePerfTools_jrnlPerf_PerfTest_h_
diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.cpp b/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.cpp
new file mode 100644
index 0000000000..9fe214726d
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.cpp
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestResult.cpp
+ */
+
+#include "TestResult.h"
+
+#include <stdint.h> // uint32_t
+
+namespace tests {
+namespace storePerftools {
+namespace jrnlPerf {
+
+TestResult::TestResult(const tests::storePerftools::common::TestParameters& tp) :
+ tests::storePerftools::common::TestResult(),
+ m_testParams(tp)
+{}
+
+TestResult::~TestResult()
+{}
+
+void
+TestResult::toStream(std::ostream& os) const
+{
+ double msgsRate;
+ os << "TEST RESULTS:" << std::endl;
+ os << " Msgs per thread: " << m_testParams.m_numMsgs << std::endl;
+ os << " Msg size: " << m_testParams.m_msgSize << std::endl;
+ os << " No. queues: " << m_testParams.m_numQueues << std::endl;
+ os << " No. enq threads/queue: " << m_testParams.m_numEnqThreadsPerQueue << std::endl;
+ os << " No. deq threads/queue: " << m_testParams.m_numDeqThreadsPerQueue << std::endl;
+ os << " Time taken: " << m_elapsed << " sec" << std::endl;
+ uint32_t msgsPerQueue = m_testParams.m_numMsgs * m_testParams.m_numEnqThreadsPerQueue;
+ if (m_testParams.m_numQueues > 1) {
+ msgsRate = double(msgsPerQueue) / m_elapsed;
+ os << " No. msgs per queue: " << msgsPerQueue << std::endl;
+ os << "Per queue msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl;
+ os << " " << (msgsRate * m_testParams.m_msgSize / 1e6) << " MB/sec" << std::endl;
+ }
+ uint32_t totalMsgs = msgsPerQueue * m_testParams.m_numQueues;
+ msgsRate = double(totalMsgs) / m_elapsed;
+ os << " Total no. msgs: " << totalMsgs << std::endl;
+ os << " Broker msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl;
+ os << " " << (msgsRate * m_testParams.m_msgSize / 1e6) << " MB/sec" << std::endl;
+}
+
+}}} // namespace tests::storePerftools::jrnlPerf
diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.h b/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.h
new file mode 100644
index 0000000000..dae09a6032
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TestResult.h
+ */
+
+#ifndef tests_storePerfTools_jrnlPerf_TestResult_h_
+#define tests_storePerfTools_jrnlPerf_TestResult_h_
+
+#include "tests/storePerfTools/common/TestParameters.h"
+#include "tests/storePerfTools/common/TestResult.h"
+
+namespace tests {
+namespace storePerftools {
+namespace jrnlPerf {
+
+class TestOptions;
+
+/**
+ * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in the journal.
+ *
+ * This class (being subclassed from ScopedTimable) is passed to a ScopedTimer object on construction, and the
+ * inherited _elapsed member will be written with the calculated elapsed time (in seconds) on destruction of the
+ * ScopedTimer object. This time (initially set to 0.0) is used to calculate message and message byte throughput.
+ * The message number and size information comes from the JrnlPerfTestParameters object passed to the constructor.
+ *
+ * Results are available through the use of toStream(), toString() or the << operators.
+ *
+ * Output is in the following format:
+ * <pre>
+ * TEST RESULTS:
+ * Msgs per thread: 10000
+ * Msg size: 2048
+ * No. queues: 2
+ * No. threads/queue: 2
+ * Time taken: 1.6626 sec
+ * Total no. msgs: 40000
+ * Msg throughput: 24.0587 kMsgs/sec
+ * 49.2723 MB/sec
+ * </pre>
+ */
+class TestResult : public tests::storePerftools::common::TestResult
+{
+public:
+ /**
+ * \brief Constructor
+ *
+ * Constructor. Will start the time interval measurement.
+ *
+ * \param tp Test parameter details used to calculate the performance results.
+ */
+ TestResult(const tests::storePerftools::common::TestParameters& tp);
+
+ /**
+ * \brief Virtual destructor
+ */
+ virtual ~TestResult();
+
+ /**
+ * \brief Stream the performance test results to an output stream
+ *
+ * Convenience feature which streams a multi-line performance result an output stream.
+ *
+ * \param os Output stream to which the results are to be streamed
+ */
+ void toStream(std::ostream& os = std::cout) const;
+
+protected:
+ tests::storePerftools::common::TestParameters m_testParams; ///< Test parameters used for performance calculations
+
+};
+
+}}} // namespace tests::storePerftools::jrnlPerf
+
+#endif // tests_storePerfTools_jrnlPerf_TestResult_h_
diff --git a/cpp/src/tests/storePerfTools/version.h b/cpp/src/tests/storePerfTools/version.h
new file mode 100644
index 0000000000..311b145330
--- /dev/null
+++ b/cpp/src/tests/storePerfTools/version.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file version.h
+ */
+
+#ifndef tests_storePerftools_version_h_
+#define tests_storePerftools_version_h_
+
+#include <iostream>
+#include <sstream>
+
+namespace tests {
+namespace storePerftools {
+
+static const int versionMajor = 0;
+static const int versionMinor = 0;
+static const int versionRevision = 1;
+
+std::string name() {
+ return "Qpid async store perftools";
+}
+
+std::string version() {
+ std::ostringstream oss;
+ oss << versionMajor << "." << versionMinor << "." << versionRevision;
+ return oss.str();
+}
+
+}} // namespace tests::perftools
+
+#endif // tests_storePerftools_version_h_