diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-05-09 14:19:28 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-09 14:19:28 +0000 |
commit | 2e3690efe94e6161d129ebce2f8c22cb25819ec1 (patch) | |
tree | 8fc2b2aa701127d7ee827319b809aeaf543f2a59 /cpp/src/tests | |
parent | 633c33f224f3196f3f9bd80bd2e418d8143fea06 (diff) | |
download | qpid-python-2e3690efe94e6161d129ebce2f8c22cb25819ec1.tar.gz |
QPID-3858: Initial checkin of async interface implementation and test harness.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1336220 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
42 files changed, 4545 insertions, 0 deletions
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index 5979ce42ae..965e4b2b38 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -343,3 +343,74 @@ add_library (dlclose_noop MODULE dlclose_noop.c) #EXTRA_DIST+=$(LONG_TESTS) run_perftest #check-long: # $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND= + + +# Async Store perf tests +# ---------------------- + +# New journal perf test (jrnl2Perf) +set (jrnl2Perf_SOURCES + storePerfTools/jrnlPerf/Journal.cpp + storePerfTools/jrnlPerf/JournalParameters.cpp + storePerfTools/jrnlPerf/PerfTest.cpp + storePerfTools/jrnlPerf/TestResult.cpp + + storePerfTools/common/Parameters.cpp + storePerfTools/common/PerftoolError.cpp + storePerfTools/common/ScopedTimable.cpp + storePerfTools/common/ScopedTimer.cpp + storePerfTools/common/Streamable.cpp + storePerfTools/common/TestParameters.cpp + storePerfTools/common/TestResult.cpp + storePerfTools/common/Thread.cpp +) + +if (UNIX) + add_executable (jrnl2Perf ${jrnl2Perf_SOURCES}) + set_target_properties (jrnl2Perf PROPERTIES + COMPILE_FLAGS "-DJOURNAL2" + LINK_FLAGS "-L${QPID_BUILD_DIR}/src" + ) + target_link_libraries (jrnl2Perf + asyncStore + qpidbroker + rt + ) +endif (UNIX) + +# Async store perf test (asyncPerf) +set (asyncPerf_SOURCES + storePerfTools/asyncPerf/MockPersistableMessage.cpp + storePerfTools/asyncPerf/MockPersistableQueue.cpp + storePerfTools/asyncPerf/MockTransactionContext.cpp + storePerfTools/asyncPerf/PerfTest.cpp + storePerfTools/asyncPerf/QueuedMessage.cpp + storePerfTools/asyncPerf/TestOptions.cpp + storePerfTools/asyncPerf/TestResult.cpp + + storePerfTools/common/Parameters.cpp + storePerfTools/common/PerftoolError.cpp + storePerfTools/common/ScopedTimable.cpp + storePerfTools/common/ScopedTimer.cpp + storePerfTools/common/Streamable.cpp + storePerfTools/common/TestOptions.cpp + storePerfTools/common/TestParameters.cpp + storePerfTools/common/TestResult.cpp + storePerfTools/common/Thread.cpp +) + +if (UNIX) + add_executable (asyncPerf ${asyncPerf_SOURCES}) + set_target_properties (asyncPerf PROPERTIES + COMPILE_FLAGS "-DJOURNAL2" + LINK_FLAGS "-L${QPID_BUILD_DIR}/src" + ) + target_link_libraries (asyncPerf + boost_program_options + asyncStore + qpidbroker + qpidcommon + qpidtypes + rt + ) +endif (UNIX) diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp new file mode 100644 index 0000000000..75fc921494 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockPersistableMessage.cpp + */ + +#include "MockPersistableMessage.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +// --- Inner class Queue::MessageContext --- + +MockPersistableMessage::MessageContext::MessageContext(MockPersistableMessagePtr msg, + const qpid::asyncStore::AsyncOperation::opCode op, + MockPersistableQueue* q) : + m_msg(msg), + m_op(op), + m_q(q) +{} + +MockPersistableMessage::MessageContext::~MessageContext() +{} + +const char* +MockPersistableMessage::MessageContext::getOp() const +{ + return qpid::asyncStore::AsyncOperation::getOpStr(m_op); +} + +void +MockPersistableMessage::MessageContext::destroy() +{ + delete this; +} + +// --- Class MockPersistableMessage --- + + +MockPersistableMessage::MockPersistableMessage(const char* msgData, + const uint32_t msgSize, + AsyncStoreImplPtr store, + const bool persistent) : + m_persistenceId(0ULL), + m_msg(msgData, static_cast<size_t>(msgSize)), + m_persistent(persistent), + m_msgHandle(store->createMessageHandle(this)) +{} + +MockPersistableMessage::~MockPersistableMessage() +{} + +// static +void +MockPersistableMessage::handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc) +{ + if (bc) { + MessageContext* mc = dynamic_cast<MessageContext*>(bc); + if (mc->m_msg) { + if (res->errNo) { + // TODO: Handle async failure here + std::cerr << "Message pid=0x" << std::hex << mc->m_msg->m_persistenceId << std::dec << ": Operation " + << mc->getOp() << ": failure " << res->errNo << " (" << res->errMsg << ")" << std::endl; + } else { + // Handle async success here + switch(mc->m_op) { + case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: + mc->m_msg->dequeueComplete(mc); + break; + case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: + mc->m_msg->enqueueComplete(mc); + break; + default: + std::ostringstream oss; + oss << "tests::storePerftools::asyncPerf::MockPersistableMessage::handleAsyncResult(): Unknown async queue operation: " << mc->m_op; + throw qpid::Exception(oss.str()); + }; + } + } + } + if (bc) delete bc; + if (res) delete res; +} + +qpid::broker::MessageHandle& +MockPersistableMessage::getHandle() +{ + return m_msgHandle; +} + +void +MockPersistableMessage::setPersistenceId(uint64_t id) const +{ + m_persistenceId = id; +} + +uint64_t +MockPersistableMessage::getPersistenceId() const +{ + return m_persistenceId; +} + +void +MockPersistableMessage::encode(qpid::framing::Buffer& buffer) const +{ + buffer.putRawData(m_msg); +} + +uint32_t +MockPersistableMessage::encodedSize() const +{ + return static_cast<uint32_t>(m_msg.size()); +} + +void +MockPersistableMessage::allDequeuesComplete() +{} + +uint32_t +MockPersistableMessage::encodedHeaderSize() const +{ + return 0; +} + +bool +MockPersistableMessage::isPersistent() const +{ + return m_persistent; +} + +uint64_t +MockPersistableMessage::getSize() +{ + return m_msg.size(); +} + +void +MockPersistableMessage::write(char* target) +{ + ::memcpy(target, m_msg.data(), m_msg.size()); +} + +// protected +void +MockPersistableMessage::enqueueComplete(const MessageContext* /*mc*/) +{ +//std::cout << "~~~~~ Message pid=0x" << std::hex << m_persistenceId << std::dec << ": enqueueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl; +} + +// protected +void +MockPersistableMessage::dequeueComplete(const MessageContext* /*mc*/) +{ +//std::cout << "~~~~~ Message pid=0x" << std::hex << m_persistenceId << std::dec << ": dequeueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h new file mode 100644 index 0000000000..a139328690 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockPersistableMessage.h + */ + +#ifndef tests_storePerfTools_asyncPerf_MockPersistableMessage_h_ +#define tests_storePerfTools_asyncPerf_MockPersistableMessage_h_ + +#include "qpid/asyncStore/AsyncOperation.h" +#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource +#include "qpid/broker/BrokerContext.h" +#include "qpid/broker/MessageHandle.h" +#include "qpid/broker/PersistableMessage.h" + +#include <stdint.h> // uint32_t + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class MockPersistableMessage; +class MockPersistableQueue; + +typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr; +typedef boost::shared_ptr<MockPersistableMessage> MockPersistableMessagePtr; + +class MockPersistableMessage: public qpid::broker::PersistableMessage, qpid::broker::DataSource +{ +public: + class MessageContext : public qpid::broker::BrokerContext + { + public: + MessageContext(MockPersistableMessagePtr msg, + const qpid::asyncStore::AsyncOperation::opCode op, + MockPersistableQueue* q); + virtual ~MessageContext(); + const char* getOp() const; + void destroy(); + MockPersistableMessagePtr m_msg; + const qpid::asyncStore::AsyncOperation::opCode m_op; + MockPersistableQueue* m_q; + }; + + MockPersistableMessage(const char* msgData, + const uint32_t msgSize, + AsyncStoreImplPtr store, + const bool persistent = true); + virtual ~MockPersistableMessage(); + static void handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc); + qpid::broker::MessageHandle& getHandle(); + + // Interface Persistable + virtual void setPersistenceId(uint64_t id) const; + virtual uint64_t getPersistenceId() const; + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + + // Interface PersistableMessage + virtual void allDequeuesComplete(); + virtual uint32_t encodedHeaderSize() const; + virtual bool isPersistent() const; + + // Interface DataStore + virtual uint64_t getSize(); + virtual void write(char* target); + +protected: + mutable uint64_t m_persistenceId; + const std::string m_msg; + const bool m_persistent; + qpid::broker::MessageHandle m_msgHandle; + + // --- Ascnc op completions (called through handleAsyncResult) --- + void enqueueComplete(const MessageContext* mc); + void dequeueComplete(const MessageContext* mc); + +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerfTools_asyncPerf_MockPersistableMessage_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp new file mode 100644 index 0000000000..ff9b76c421 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockPersistableQueue.cpp + */ + +#include "MockPersistableQueue.h" + +#include "MockPersistableMessage.h" +#include "MockTransactionContext.h" +#include "QueuedMessage.h" +#include "TestOptions.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/EnqueueHandle.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +// --- Inner class MockPersistableQueue::QueueContext --- + +MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueue* q, + const qpid::asyncStore::AsyncOperation::opCode op) : + m_q(q), + m_op(op) +{} + +MockPersistableQueue::QueueContext::~QueueContext() +{} + +const char* +MockPersistableQueue::QueueContext::getOp() const +{ + return qpid::asyncStore::AsyncOperation::getOpStr(m_op); +} + +void +MockPersistableQueue::QueueContext::destroy() +{ + delete this; +} + +// --- Class MockPersistableQueue --- + +MockPersistableQueue::MockPersistableQueue(const std::string& name, + const qpid::framing::FieldTable& /*args*/, + AsyncStoreImplPtr store, + const TestOptions& to, + const char* msgData) : + m_name(name), + m_store(store), + m_persistenceId(0ULL), + m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. + m_perfTestOpts(to), + m_msgData(msgData) +{ + const qpid::types::Variant::Map qo; + m_queueHandle = m_store->createQueueHandle(m_name, qo); + qpid::broker::BrokerContext* bc = new QueueContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE); + m_store->submitCreate(m_queueHandle, + dynamic_cast<const qpid::broker::DataSource*>(this), + &handleAsyncResult, + bc); +} + +MockPersistableQueue::~MockPersistableQueue() +{ +// m_store->flush(*this); + // TODO: Make destroying the store a test parameter +// m_store->destroy(*this); +// m_store = 0; +} + +// static +void +MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc) +{ + if (bc && res) { + QueueContext* qc = dynamic_cast<QueueContext*>(bc); + if (qc->m_q) { + if (res->errNo) { + // TODO: Handle async failure here + std::cerr << "Queue name=\"" << qc->m_q->m_name << "\": Operation " << qc->getOp() << ": failure " + << res->errNo << " (" << res->errMsg << ")" << std::endl; + } else { + // Handle async success here + switch(qc->m_op) { + case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: + qc->m_q->createComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH: + qc->m_q->flushComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: + qc->m_q->destroyComplete(qc); + break; + default: + std::ostringstream oss; + oss << "tests::storePerftools::asyncPerf::MockPersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->m_op; + throw qpid::Exception(oss.str()); + }; + } + } + } + if (bc) delete bc; + if (res) delete res; +} + +qpid::broker::QueueHandle& +MockPersistableQueue::getHandle() +{ + return m_queueHandle; +} + +void* +MockPersistableQueue::runEnqueues() +{ + uint32_t numMsgs = 0; + uint16_t txnCnt = 0; + const bool useTxn = m_perfTestOpts.m_enqTxnBlockSize > 0; + MockTransactionContextPtr txn; + while (numMsgs < m_perfTestOpts.m_numMsgs) { + if (useTxn && txnCnt == 0) { + txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() + } + MockPersistableMessagePtr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true)); + msg->setPersistenceId(m_store->getNextRid()); + qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle); + MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, this); + if (useTxn) { + m_store->submitEnqueue(enqHandle, + txn->getHandle(), + &MockPersistableMessage::handleAsyncResult, + dynamic_cast<qpid::broker::BrokerContext*>(msgCtxt)); + } else { + m_store->submitEnqueue(enqHandle, + &MockPersistableMessage::handleAsyncResult, + dynamic_cast<qpid::broker::BrokerContext*>(msgCtxt)); + } + QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn)); + push(qm); +//std::cout << "**** push 0x" << std::hex << msg->getPersistenceId() << std::dec << std::endl; + if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) { + txn->commit(); + txnCnt = 0; + } + ++numMsgs; + } + if (txnCnt > 0) { + txn->commit(); + txnCnt = 0; + } + return 0; +} + +void* +MockPersistableQueue::runDequeues() +{ + uint32_t numMsgs = 0; + const uint32_t numMsgsToDequeue = m_perfTestOpts.m_numMsgs * m_perfTestOpts.m_numEnqThreadsPerQueue / m_perfTestOpts.m_numDeqThreadsPerQueue; + uint16_t txnCnt = 0; + const bool useTxn = m_perfTestOpts.m_deqTxnBlockSize > 0; + MockTransactionContextPtr txn; + QueuedMessagePtr qm; + while (numMsgs < numMsgsToDequeue) { + if (useTxn && txnCnt == 0) { + txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() + } + pop(qm); + if (qm.get()) { + qpid::broker::EnqueueHandle enqHandle = qm->getEnqueueHandle(); + qpid::broker::BrokerContext* bc = new MockPersistableMessage::MessageContext(qm->getMessage(), + qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, + this); + if (useTxn) { + m_store->submitDequeue(enqHandle, + txn->getHandle(), + &MockPersistableMessage::handleAsyncResult, + bc); + } else { + m_store->submitDequeue(enqHandle, + &MockPersistableMessage::handleAsyncResult, + bc); + } + ++numMsgs; + qm.reset(static_cast<QueuedMessage*>(0)); + if (useTxn && ++txnCnt >= m_perfTestOpts.m_deqTxnBlockSize) { + txn->commit(); + txnCnt = 0; + } + } else { +// ::usleep(100); // 0.1 ms TODO: Use a condition variable instead of sleeping/spinning + } + } + if (txnCnt > 0) { + txn->commit(); + txnCnt = 0; + } + return 0; +} + +//static +void* +MockPersistableQueue::startEnqueues(void* ptr) +{ + return reinterpret_cast<MockPersistableQueue*>(ptr)->runEnqueues(); +} + +//static +void* +MockPersistableQueue::startDequeues(void* ptr) +{ + return reinterpret_cast<MockPersistableQueue*>(ptr)->runDequeues(); +} + +void +MockPersistableQueue::encode(qpid::framing::Buffer& buffer) const +{ + buffer.putShortString(m_name); +} + +uint32_t +MockPersistableQueue::encodedSize() const +{ + return m_name.size() + 1; +} + +uint64_t +MockPersistableQueue::getPersistenceId() const +{ + return m_persistenceId; +} + +void +MockPersistableQueue::setPersistenceId(uint64_t persistenceId) const +{ + m_persistenceId = persistenceId; +} + +void +MockPersistableQueue::flush() +{ + //if(m_store) m_store->flush(*this); +} + +const std::string& +MockPersistableQueue::getName() const +{ + return m_name; +} + +void +MockPersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) +{ + if (externalQueueStore != inst && externalQueueStore) + delete externalQueueStore; + externalQueueStore = inst; +} + +uint64_t +MockPersistableQueue::getSize() +{ + return m_persistableData.size(); +} + +void +MockPersistableQueue::write(char* target) +{ + ::memcpy(target, m_persistableData.data(), m_persistableData.size()); +} + +// protected +void +MockPersistableQueue::createComplete(const QueueContext* /*qc*/) +{ +//std::cout << "~~~~~ Queue name=\"" << m_name << "\": createComplete()" << std::endl; +} + +// protected +void +MockPersistableQueue::flushComplete(const QueueContext* /*qc*/) +{ +//std::cout << "~~~~~ Queue name=\"" << m_name << "\": flushComplete()" << std::endl; +} + +// protected +void +MockPersistableQueue::destroyComplete(const QueueContext* /*qc*/) +{ +//std::cout << "~~~~~ Queue name=\"" << m_name << "\": destroyComplete()" << std::endl; +} + +// protected +void +MockPersistableQueue::push(QueuedMessagePtr& qm) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); + m_enqueuedMsgs.push_back(qm); + m_dequeueCondition.notify(); +} + +// protected +void +MockPersistableQueue::pop(QueuedMessagePtr& qm) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); + if (m_enqueuedMsgs.empty()) { + m_dequeueCondition.wait(m_enqueuedMsgsMutex); + } + qm = m_enqueuedMsgs.front(); + if (qm->isTransactional()) { + // The next msg is still in an open transaction, skip and find next non-open-txn msg + MsgEnqListItr i = m_enqueuedMsgs.begin(); + while (++i != m_enqueuedMsgs.end()) { + if (!(*i)->isTransactional()) { + qm = *i; + m_enqueuedMsgs.erase(i); + } + } + } else { + // The next msg is not in an open txn + m_enqueuedMsgs.pop_front(); + } +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h new file mode 100644 index 0000000000..a77f41743c --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockPersistableQueue.h + */ + +#ifndef tests_storePerfTools_asyncPerf_MockPersistableQueue_h_ +#define tests_storePerfTools_asyncPerf_MockPersistableQueue_h_ + +#include "qpid/asyncStore/AsyncOperation.h" +#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource +#include "qpid/broker/BrokerContext.h" +#include "qpid/broker/PersistableQueue.h" +#include "qpid/broker/QueueHandle.h" +#include "qpid/sys/Condition.h" +#include "qpid/sys/Mutex.h" + +#include <boost/shared_ptr.hpp> +#include <deque> + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +} +namespace framing { +class FieldTable; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class QueuedMessage; +class TestOptions; + +typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr; +typedef boost::shared_ptr<QueuedMessage> QueuedMessagePtr; + +class MockPersistableQueue : public qpid::broker::PersistableQueue, qpid::broker::DataSource +{ +public: + class QueueContext : public qpid::broker::BrokerContext + { + public: + QueueContext(MockPersistableQueue* q, + const qpid::asyncStore::AsyncOperation::opCode op); + virtual ~QueueContext(); + const char* getOp() const; + void destroy(); + MockPersistableQueue* m_q; + const qpid::asyncStore::AsyncOperation::opCode m_op; + }; + + MockPersistableQueue(const std::string& name, + const qpid::framing::FieldTable& args, + AsyncStoreImplPtr store, + const TestOptions& perfTestParams, + const char* msgData); + virtual ~MockPersistableQueue(); + + // --- Async functionality --- + static void handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc); + qpid::broker::QueueHandle& getHandle(); + + // --- Performance test thread entry points --- + void* runEnqueues(); + void* runDequeues(); + static void* startEnqueues(void* ptr); + static void* startDequeues(void* ptr); + + // --- Interface qpid::broker::Persistable --- + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + virtual uint64_t getPersistenceId() const; + virtual void setPersistenceId(uint64_t persistenceId) const; + + // --- Interface qpid::broker::PersistableQueue --- + virtual void flush(); + virtual const std::string& getName() const; + virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst); + + // --- Interface DataStore --- + virtual uint64_t getSize(); + virtual void write(char* target); + +protected: + const std::string m_name; + AsyncStoreImplPtr m_store; + mutable uint64_t m_persistenceId; + std::string m_persistableData; + qpid::broker::QueueHandle m_queueHandle; + + // Test params + const TestOptions& m_perfTestOpts; + const char* m_msgData; + + typedef std::deque<QueuedMessagePtr> MsgEnqList; + typedef MsgEnqList::iterator MsgEnqListItr; + MsgEnqList m_enqueuedMsgs; + qpid::sys::Mutex m_enqueuedMsgsMutex; + qpid::sys::Condition m_dequeueCondition; + + // --- Ascnc op completions (called through handleAsyncResult) --- + void createComplete(const QueueContext* qc); + void flushComplete(const QueueContext* qc); + void destroyComplete(const QueueContext* qc); + + // --- Queue functionality --- + void push(QueuedMessagePtr& msg); + void pop(QueuedMessagePtr& msg); +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerfTools_asyncPerf_MockPersistableQueue_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp new file mode 100644 index 0000000000..e564876daa --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockTransactionContext.cpp + */ + +#include "MockTransactionContext.h" + +#include "QueuedMessage.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +// --- Inner class MockTransactionContext::QueueContext --- + +MockTransactionContext::TransactionContext::TransactionContext(MockTransactionContext* tc, + const qpid::asyncStore::AsyncOperation::opCode op) : + m_tc(tc), + m_op(op) +{} + +MockTransactionContext::TransactionContext::~TransactionContext() +{} + +const char* +MockTransactionContext::TransactionContext::getOp() const +{ + return qpid::asyncStore::AsyncOperation::getOpStr(m_op); +} + +void +MockTransactionContext::TransactionContext::destroy() +{ + delete this; +} + +// --- Class MockTransactionContext --- + + +MockTransactionContext::MockTransactionContext(AsyncStoreImplPtr store, + const std::string& xid) : + m_store(store), + m_txnHandle(store->createTxnHandle(xid)), + m_prepared(false), + m_enqueuedMsgs() +{ +//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; +} + +MockTransactionContext::~MockTransactionContext() +{} + +// static +void +MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc) +{ + if (bc && res) { + TransactionContext* tc = dynamic_cast<TransactionContext*>(bc); + if (tc->m_tc) { + if (res->errNo) { + // TODO: Handle async failure here + std::cerr << "Transaction xid=\"" << tc->m_tc->getXid() << "\": Operation " << tc->getOp() << ": failure " + << res->errNo << " (" << res->errMsg << ")" << std::endl; + } else { + // Handle async success here + switch(tc->m_op) { + case qpid::asyncStore::AsyncOperation::TXN_PREPARE: + tc->m_tc->prepareComplete(tc); + break; + case qpid::asyncStore::AsyncOperation::TXN_COMMIT: + tc->m_tc->commitComplete(tc); + break; + case qpid::asyncStore::AsyncOperation::TXN_ABORT: + tc->m_tc->abortComplete(tc); + break; + default: + std::ostringstream oss; + oss << "tests::storePerftools::asyncPerf::MockTransactionContext::handleAsyncResult(): Unknown async operation: " << tc->m_op; + throw qpid::Exception(oss.str()); + }; + } + } + } + if (bc) delete bc; + if (res) delete res; +} + +qpid::broker::TxnHandle& +MockTransactionContext::getHandle() +{ + return m_txnHandle; +} + +bool +MockTransactionContext::is2pc() const +{ + return m_txnHandle.is2pc(); +} + +const std::string& +MockTransactionContext::getXid() const +{ + return m_txnHandle.getXid(); +} + +void +MockTransactionContext::addEnqueuedMsg(QueuedMessage* qm) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); + m_enqueuedMsgs.push_back(qm); +} + +void +MockTransactionContext::prepare() +{ + if (m_txnHandle.is2pc()) { + localPrepare(); + m_prepared = true; + } + std::ostringstream oss; + oss << "MockTransactionContext::prepare(): xid=\"" << getXid() + << "\": Transaction Error: called prepare() on local transaction"; + throw qpid::Exception(oss.str()); +} + +void +MockTransactionContext::abort() +{ + // TODO: Check the following XA transaction semantics: + // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared. + if (!m_prepared) { + localPrepare(); + } + m_store->submitAbort(m_txnHandle, + &handleAsyncResult, + dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT))); +//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + +void +MockTransactionContext::commit() +{ + if (is2pc()) { + if (!m_prepared) { + std::ostringstream oss; + oss << "MockTransactionContext::abort(): xid=\"" << getXid() + << "\": Transaction Error: called commit() without prepare() on 2PC transaction"; + throw qpid::Exception(oss.str()); + } + } else { + localPrepare(); + } + m_store->submitCommit(m_txnHandle, + &handleAsyncResult, + dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT))); +//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + + +// protected +void +MockTransactionContext::localPrepare() +{ + m_store->submitPrepare(m_txnHandle, + &handleAsyncResult, + dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE))); +//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + +// protected +void +MockTransactionContext::prepareComplete(const TransactionContext* /*tc*/) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); + while (!m_enqueuedMsgs.empty()) { + m_enqueuedMsgs.front()->clearTransaction(); + m_enqueuedMsgs.pop_front(); + } +//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": prepareComplete()" << std::endl; +} + + +// protected +void +MockTransactionContext::abortComplete(const TransactionContext* /*tc*/) +{ +//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": abortComplete()" << std::endl; +} + + +// protected +void +MockTransactionContext::commitComplete(const TransactionContext* /*tc*/) +{ +//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": commitComplete()" << std::endl; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h new file mode 100644 index 0000000000..35de7374c5 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockTransactionContext.h + */ + +#ifndef tests_storePerfTools_asyncPerf_MockTransactionContext_h_ +#define tests_storePerfTools_asyncPerf_MockTransactionContext_h_ + +#include "qpid/asyncStore/AsyncOperation.h" + +#include "qpid/broker/BrokerContext.h" +#include "qpid/broker/TransactionalStore.h" // qpid::broker::TransactionContext +#include "qpid/broker/TxnHandle.h" +#include "qpid/sys/Mutex.h" + +#include <boost/shared_ptr.hpp> +#include <deque> + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class QueuedMessage; + +typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr; + +class MockTransactionContext : public qpid::broker::TransactionContext +{ +public: + // NOTE: TransactionContext - Bad naming? This context is the async return handling context for class + // MockTransactionContext async ops. Other classes using this pattern simply use XXXContext for this class + // (e.g. QueueContext in MockPersistableQueue), but in this case it may be confusing. + class TransactionContext : public qpid::broker::BrokerContext + { + public: + TransactionContext(MockTransactionContext* tc, + const qpid::asyncStore::AsyncOperation::opCode op); + virtual ~TransactionContext(); + const char* getOp() const; + void destroy(); + MockTransactionContext* m_tc; + const qpid::asyncStore::AsyncOperation::opCode m_op; + }; + + MockTransactionContext(AsyncStoreImplPtr store, + const std::string& xid = std::string()); + virtual ~MockTransactionContext(); + static void handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc); + + qpid::broker::TxnHandle& getHandle(); + bool is2pc() const; + const std::string& getXid() const; + void addEnqueuedMsg(QueuedMessage* qm); + + void prepare(); + void abort(); + void commit(); + +protected: + AsyncStoreImplPtr m_store; + qpid::broker::TxnHandle m_txnHandle; + bool m_prepared; + std::deque<QueuedMessage*> m_enqueuedMsgs; + qpid::sys::Mutex m_enqueuedMsgsMutex; + + void localPrepare(); + + // --- Ascnc op completions (called through handleAsyncResult) --- + void prepareComplete(const TransactionContext* tc); + void abortComplete(const TransactionContext* tc); + void commitComplete(const TransactionContext* tc); + +}; + +}}} // namespace tests:storePerftools::asyncPerf + +#endif // tests_storePerfTools_asyncPerf_MockTransactionContext_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp new file mode 100644 index 0000000000..fe66604774 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerfTest.cpp + */ + +#include "PerfTest.h" + +#include "MockPersistableQueue.h" + +#include "tests/storePerfTools/version.h" +#include "tests/storePerfTools/common/ScopedTimer.h" +#include "tests/storePerfTools/common/Thread.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" + +#include <iomanip> + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +PerfTest::PerfTest(const TestOptions& to, + const qpid::asyncStore::AsyncStoreOptions& aso) : + m_testOpts(to), + m_storeOpts(aso), + m_testResult(to), + m_msgData(new char[to.m_msgSize]), + m_poller(new qpid::sys::Poller), + m_pollingThread(m_poller.get()) +{ + std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize); +} + +PerfTest::~PerfTest() +{ + m_poller->shutdown(); + m_pollingThread.join(); + delete[] m_msgData; +} + +AsyncStoreImplPtr +PerfTest::prepareStore() +{ + AsyncStoreImplPtr store(new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts)); + store->initialize(); + return store; +} + +void +PerfTest::prepareQueues(std::deque<MockPersistableQueuePtr>& jrnlList, + AsyncStoreImplPtr store) +{ + for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { + std::ostringstream qname; + qname << "queue_" << std::setw(4) << std::setfill('0') << i; + MockPersistableQueuePtr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, store, m_testOpts, m_msgData)); + jrnlList.push_back(mpq); + } +} + +void +PerfTest::destroyQueues(std::deque<MockPersistableQueuePtr>& jrnlList) +{ + jrnlList.clear(); +} + +void +PerfTest::run() +{ + typedef boost::shared_ptr<tests::storePerftools::common::Thread> ThreadPtr; // TODO - replace with qpid threads + + AsyncStoreImplPtr store = prepareStore(); + + std::deque<MockPersistableQueuePtr> queueList; + prepareQueues(queueList, store); + + std::deque<ThreadPtr> threads; + { // --- Start of timed section --- + tests::storePerftools::common::ScopedTimer st(m_testResult); + + for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) { + for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads + ThreadPtr tp(new tests::storePerftools::common::Thread(queueList[q]->startEnqueues, + reinterpret_cast<void*>(queueList[q].get()))); + threads.push_back(tp); + } + for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads + ThreadPtr tp(new tests::storePerftools::common::Thread(queueList[q]->startDequeues, + reinterpret_cast<void*>(queueList[q].get()))); + threads.push_back(tp); + } + } + while (threads.size()) { + threads.front()->join(); + threads.pop_front(); + } + } // --- End of timed section --- + + destroyQueues(queueList); +// DEBUG MEASURE - REMOVE WHEN FIXED +//::sleep(2); +} + +void +PerfTest::toStream(std::ostream& os) const +{ + m_testOpts.printVals(os); + os << std::endl; + m_storeOpts.printVals(os); + os << std::endl; + os << m_testResult << std::endl; +} + +}}} // namespace tests::storePerftools::asyncPerf + +// ----------------------------------------------------------------- + +int +main(int argc, char** argv) +{ + qpid::CommonOptions co; + qpid::asyncStore::AsyncStoreOptions aso; + tests::storePerftools::asyncPerf::TestOptions to; + qpid::Options opts; + opts.add(co).add(aso).add(to); + try { + opts.parse(argc, argv); + aso.validate(); + to.validate(); + } + catch (std::exception& e) { + std::cerr << e.what() << std::endl; + return 1; + } + + // Handle options that just print information then exit. + if (co.version) { + std::cout << tests::storePerftools::name() << " v." << tests::storePerftools::version() << std::endl; + return 0; + } + if (co.help) { + std::cout << tests::storePerftools::name() << ": asyncPerf" << std::endl; + std::cout << "Performance test for the async store through the qpid async store interface." << std::endl; + std::cout << "Usage: asyncPerf [options]" << std::endl; + std::cout << opts << std::endl; + return 0; + } + + // Create and start test + tests::storePerftools::asyncPerf::PerfTest apt(to, aso); + apt.run(); + + // Print test result + std::cout << apt << std::endl; + ::sleep(1); + return 0; +} diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h new file mode 100644 index 0000000000..1eb11a51fa --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerfTest.h + */ + +#ifndef tests_storePerfTools_asyncPerf_PerfTest_h_ +#define tests_storePerfTools_asyncPerf_PerfTest_h_ + +#include "TestResult.h" + +#include "tests/storePerfTools/common/Streamable.h" + +#include "qpid/framing/FieldTable.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Thread.h" + +#include <boost/shared_ptr.hpp> +#include <deque> + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +class AsyncStoreOptions; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class MockPersistableQueue; +class TestOptions; + +typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr; +typedef boost::shared_ptr<MockPersistableQueue> MockPersistableQueuePtr; + +class PerfTest : public tests::storePerftools::common::Streamable +{ +public: + PerfTest(const TestOptions& to, + const qpid::asyncStore::AsyncStoreOptions& aso); + virtual ~PerfTest(); + void run(); + void toStream(std::ostream& os = std::cout) const; + +protected: + const TestOptions& m_testOpts; + const qpid::asyncStore::AsyncStoreOptions& m_storeOpts; + TestResult m_testResult; + qpid::framing::FieldTable m_queueArgs; + const char* m_msgData; + boost::shared_ptr<qpid::sys::Poller> m_poller; + qpid::sys::Thread m_pollingThread; + + AsyncStoreImplPtr prepareStore(); + void prepareQueues(std::deque<MockPersistableQueuePtr>& jrnlList, + AsyncStoreImplPtr store); + void destroyQueues(std::deque<MockPersistableQueuePtr>& jrnlList); + +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerfTools_asyncPerf_PerfTest_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp new file mode 100644 index 0000000000..315e202d8b --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file QueuedMessage.cpp + */ + +#include "QueuedMessage.h" + +#include "MockTransactionContext.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +QueuedMessage::QueuedMessage(MockPersistableMessagePtr msg, + qpid::broker::EnqueueHandle& enqHandle, + MockTransactionContextPtr txn) : + m_msg(msg), + m_enqHandle(enqHandle), + m_txn(txn) +{ + if (txn) { + txn->addEnqueuedMsg(this); + } +} + +QueuedMessage::~QueuedMessage() +{} + +MockPersistableMessagePtr +QueuedMessage::getMessage() const +{ + return m_msg; +} + +qpid::broker::EnqueueHandle +QueuedMessage::getEnqueueHandle() const +{ + return m_enqHandle; +} + +MockTransactionContextPtr +QueuedMessage::getTransactionContext() const +{ + return m_txn; +} + +bool +QueuedMessage::isTransactional() const +{ + return m_txn.get() != 0; +} + +void +QueuedMessage::clearTransaction() +{ + m_txn.reset(static_cast<MockTransactionContext*>(0)); +} + +}}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h new file mode 100644 index 0000000000..4b3dab67f9 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file QueuedMessage.h + */ + +#ifndef tests_storePerfTools_asyncPerf_QueuedMessage_h_ +#define tests_storePerfTools_asyncPerf_QueuedMessage_h_ + +#include "qpid/broker/EnqueueHandle.h" +#include <boost/shared_ptr.hpp> + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class MockPersistableMessage; +class MockTransactionContext; + +typedef boost::shared_ptr<MockPersistableMessage> MockPersistableMessagePtr; +typedef boost::shared_ptr<MockTransactionContext> MockTransactionContextPtr; + +class QueuedMessage +{ +public: + QueuedMessage(MockPersistableMessagePtr msg, + qpid::broker::EnqueueHandle& enqHandle, + MockTransactionContextPtr txn); + virtual ~QueuedMessage(); + MockPersistableMessagePtr getMessage() const; + qpid::broker::EnqueueHandle getEnqueueHandle() const; + MockTransactionContextPtr getTransactionContext() const; + bool isTransactional() const; + void clearTransaction(); + +protected: + MockPersistableMessagePtr m_msg; + qpid::broker::EnqueueHandle m_enqHandle; + MockTransactionContextPtr m_txn; +}; + +}}} // namespace tests::storePerfTools + +#endif // tests_storePerfTools_asyncPerf_QueuedMessage_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp new file mode 100644 index 0000000000..27784ef661 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestOptions.cpp + */ + +#include "TestOptions.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +// static declarations +uint16_t TestOptions::s_defaultEnqTxnBlkSize = 0; +uint16_t TestOptions::s_defaultDeqTxnBlkSize = 0; + +TestOptions::TestOptions(const std::string& name) : + tests::storePerftools::common::TestOptions(name), + m_enqTxnBlockSize(s_defaultEnqTxnBlkSize), + m_deqTxnBlockSize(s_defaultDeqTxnBlkSize) +{ + doAddOptions(); +} + +TestOptions::TestOptions(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue, + const uint16_t enqTxnBlockSize, + const uint16_t deqTxnBlockSize, + const std::string& name) : + tests::storePerftools::common::TestOptions(numMsgs, msgSize, numQueues, numEnqThreadsPerQueue, numDeqThreadsPerQueue, name), + m_enqTxnBlockSize(enqTxnBlockSize), + m_deqTxnBlockSize(deqTxnBlockSize) +{ + doAddOptions(); +} + +TestOptions::~TestOptions() +{} + +void +TestOptions::printVals(std::ostream& os) const +{ + tests::storePerftools::common::TestOptions::printVals(os); + os << " Num enqueus per transaction [-t, --enq-txn-size]: " << m_enqTxnBlockSize << std::endl; + os << " Num dequeues per transaction [-d, --deq-txn-size]: " << m_deqTxnBlockSize << std::endl; +} + +void +TestOptions::doAddOptions() +{ + addOptions() + ("enq-txn-size,t", qpid::optValue(m_enqTxnBlockSize, "N"), + "Num enqueus per transaction (0 = no transactions)") + ("deq-txn-size,d", qpid::optValue(m_deqTxnBlockSize, "N"), + "Num dequeues per transaction (0 = no transactions)") + ; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h new file mode 100644 index 0000000000..b0e1e4ce74 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestOptions.h + */ + +#ifndef tests_storePerfTools_asyncPerf_TestOptions_h_ +#define tests_storePerfTools_asyncPerf_TestOptions_h_ + +#include "tests/storePerfTools/common/TestOptions.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class TestOptions : public tests::storePerftools::common::TestOptions +{ +public: + TestOptions(const std::string& name="Test Options"); + TestOptions(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue, + const uint16_t enqTxnBlockSize, + const uint16_t deqTxnBlockSize, + const std::string& name="Test Options"); + virtual ~TestOptions(); + void printVals(std::ostream& os) const; + + uint16_t m_enqTxnBlockSize; ///< Transaction block size for enqueues + uint16_t m_deqTxnBlockSize; ///< Transaction block size for dequeues + +protected: + static uint16_t s_defaultEnqTxnBlkSize; ///< Default transaction block size for enqueues + static uint16_t s_defaultDeqTxnBlkSize; ///< Default transaction block size for dequeues + + void doAddOptions(); +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerfTools_asyncPerf_TestOptions_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp new file mode 100644 index 0000000000..cf6f293494 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.cpp + */ + +#include "TestResult.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +TestResult::TestResult(const TestOptions& to) : + tests::storePerftools::common::TestResult(), + m_testOpts(to) +{} + +TestResult::~TestResult() +{} + +void +TestResult::toStream(std::ostream& os) const +{ + double msgsRate; + os << "TEST RESULTS:" << std::endl; + os << " Msgs per thread: " << m_testOpts.m_numMsgs << std::endl; + os << " Msg size: " << m_testOpts.m_msgSize << std::endl; + os << " No. queues: " << m_testOpts.m_numQueues << std::endl; + os << " No. enq threads/queue: " << m_testOpts.m_numEnqThreadsPerQueue << std::endl; + os << " No. deq threads/queue: " << m_testOpts.m_numDeqThreadsPerQueue << std::endl; + os << " Time taken: " << m_elapsed << " sec" << std::endl; + uint32_t msgsPerQueue = m_testOpts.m_numMsgs * m_testOpts.m_numEnqThreadsPerQueue; + if (m_testOpts.m_numQueues > 1) { + msgsRate = double(msgsPerQueue) / m_elapsed; + os << " No. msgs per queue: " << msgsPerQueue << std::endl; + os << "Per queue msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; + os << " " << (msgsRate * m_testOpts.m_msgSize / 1e6) << " MB/sec" << std::endl; + } + uint32_t totalMsgs = msgsPerQueue * m_testOpts.m_numQueues; + msgsRate = double(totalMsgs) / m_elapsed; + os << " Total no. msgs: " << totalMsgs << std::endl; + os << " Broker msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; + os << " " << (msgsRate * m_testOpts.m_msgSize / 1e6) << " MB/sec" << std::endl; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h new file mode 100644 index 0000000000..1310689ff8 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.h + */ + +#ifndef tests_storePerfTools_asyncPerf_TestResult_h_ +#define tests_storePerfTools_asyncPerf_TestResult_h_ + +#include "TestOptions.h" + +#include "tests/storePerfTools/common/TestResult.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class TestOptions; + +/** + * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in the journal. + * + * This class (being subclassed from ScopedTimable) is passed to a ScopedTimer object on construction, and the + * inherited _elapsed member will be written with the calculated elapsed time (in seconds) on destruction of the + * ScopedTimer object. This time (initially set to 0.0) is used to calculate message and message byte throughput. + * The message number and size information comes from the JrnlPerfTestParameters object passed to the constructor. + * + * Results are available through the use of toStream(), toString() or the << operators. + * + * Output is in the following format: + * <pre> + * TEST RESULTS: + * Msgs per thread: 10000 + * Msg size: 2048 + * No. queues: 2 + * No. threads/queue: 2 + * Time taken: 1.6626 sec + * Total no. msgs: 40000 + * Msg throughput: 24.0587 kMsgs/sec + * 49.2723 MB/sec + * </pre> + */ +class TestResult : public tests::storePerftools::common::TestResult +{ +public: + /** + * \brief Constructor + * + * Constructor. Will start the time interval measurement. + * + * \param tp Test parameter details used to calculate the performance results. + */ + TestResult(const TestOptions& to); + + /** + * \brief Virtual destructor + */ + virtual ~TestResult(); + + /** + * \brief Stream the performance test results to an output stream + * + * Convenience feature which streams a multi-line performance result an output stream. + * + * \param os Output stream to which the results are to be streamed + */ + void toStream(std::ostream& os = std::cout) const; + +protected: + TestOptions m_testOpts; ///< Test parameters used for performance calculations + +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerfTools_asyncPerf_TestResult_h_ diff --git a/cpp/src/tests/storePerfTools/common/Parameters.cpp b/cpp/src/tests/storePerfTools/common/Parameters.cpp new file mode 100644 index 0000000000..8e4bafaf86 --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/Parameters.cpp @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Parameters.cpp + */ + +#include "Parameters.h" + +namespace tests { +namespace storePerftools { +namespace common { + +Parameters::~Parameters() +{} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/Parameters.h b/cpp/src/tests/storePerfTools/common/Parameters.h new file mode 100644 index 0000000000..03dd2163f8 --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/Parameters.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Parameters.h + */ +#ifndef tests_storePerfTools_common_Parameters_h_ +#define tests_storePerfTools_common_Parameters_h_ + +#include "Streamable.h" + +namespace tests { +namespace storePerftools { +namespace common { + +class Parameters: public Streamable +{ +public: + virtual ~Parameters(); + virtual bool parseArg(const int arg, + const char* optarg) = 0; + +}; + +}}} // namespace tests::storePerfTools::common + +#endif // tests_storePerfTools_common_Parameters_h_ diff --git a/cpp/src/tests/storePerfTools/common/PerftoolError.cpp b/cpp/src/tests/storePerfTools/common/PerftoolError.cpp new file mode 100644 index 0000000000..5bb61b6519 --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/PerftoolError.cpp @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerftoolError.cpp + */ + +#include "PerftoolError.h" + +#include <iomanip> // std::setfill(), std::setw() + +namespace tests { +namespace storePerftools { +namespace common { + +// private +PerftoolError::PerftoolError() : + std::runtime_error(std::string()) +{} + +PerftoolError::PerftoolError(const uint32_t errCode) throw () : + std::runtime_error(std::string()), + m_errCode(errCode) +{ + formatWhatStr(); +} + +PerftoolError::PerftoolError(const std::string& errMsg) throw () : + std::runtime_error(std::string()), + m_errCode(0), + m_errMsg(errMsg) +{ + formatWhatStr(); +} + +PerftoolError::PerftoolError(const uint32_t errCode, + const std::string& errMsg) throw () : + std::runtime_error(std::string()), + m_errCode(errCode), + m_errMsg(errMsg) +{ + formatWhatStr(); +} + +PerftoolError::PerftoolError(const uint32_t errCode, + const std::string& throwingClass, + const std::string& throwingFunction) throw () : + std::runtime_error(std::string()), + m_errCode(errCode), + m_throwingClass(throwingClass), + m_throwingFunction(throwingFunction) +{ + formatWhatStr(); +} + +PerftoolError::PerftoolError(const std::string& errMsg, + const std::string& throwingClass, + const std::string& throwingFunction) throw () : + std::runtime_error(std::string()), + m_errCode(0), + m_errMsg(errMsg), + m_throwingClass(throwingClass), + m_throwingFunction(throwingFunction) +{ + formatWhatStr(); +} + +PerftoolError::PerftoolError(const uint32_t errCode, + const std::string& errMsg, + const std::string& throwingClass, + const std::string& throwingFunction) throw () : + std::runtime_error(std::string()), + m_errCode(errCode), + m_errMsg(errMsg), + m_throwingClass(throwingClass), + m_throwingFunction(throwingFunction) +{} + +PerftoolError::~PerftoolError() throw() +{} + +const char* +PerftoolError::what() const throw () +{ + return m_what.c_str(); +} + +uint32_t +PerftoolError::getErrorCode() const throw () +{ + return m_errCode; +} + +const std::string +PerftoolError::getAdditionalInfo() const throw () +{ + return m_errMsg; +} + +const std::string +PerftoolError::getThrowingClass() const throw () +{ + return m_throwingClass; +} + +const std::string +PerftoolError::getThrowingFunction() const throw () +{ + return m_throwingFunction; +} + +void +PerftoolError::toStream(std::ostream& os) const +{ + os << what(); +} + +// protected +void +PerftoolError::formatWhatStr() throw () +{ + try { + const bool ai = !m_errMsg.empty(); + const bool tc = !m_throwingClass.empty(); + const bool tf = !m_throwingFunction.empty(); + std::ostringstream oss; + oss << className() << " 0x" << std::hex << std::setfill('0') << std::setw(4) << m_errCode << " "; + if (tc) { + oss << m_throwingClass; + if (tf) { + oss << "::"; + } else { + oss << " "; + } + } + if (tf) { + oss << m_throwingFunction << "() "; + } + if (tc || tf) { + oss << "threw " << s_errorMessage(m_errCode); + } + if (ai) { + oss << " (" << m_errMsg << ")"; + } + m_what.assign(oss.str()); + } catch (...) {} +} + +// protected +const char* +PerftoolError::className() +{ + return s_className; +} + +//static +const char* PerftoolError::s_className = "PerftoolError"; + +// --- Static definitions --- +PerftoolError::errorMap_t PerftoolError::s_errorMap; +PerftoolError::errorMapCitr_t PerftoolError::s_errorMapIterator; +bool PerftoolError::s_initializedFlag = PerftoolError::s_initialize(); + +// --- Generic and system errors --- +const uint32_t PerftoolError::PERR_PTHREAD = 0x0001; + +// static +const char* +PerftoolError::s_errorMessage(const uint32_t err_no) throw () +{ + s_errorMapIterator = s_errorMap.find(err_no); + if (s_errorMapIterator == s_errorMap.end()) + return "<Unknown error code>"; + return s_errorMapIterator->second; +} + +// protected static +bool +PerftoolError::s_initialize() +{ + s_errorMap[PERR_PTHREAD] = "ERR_PTHREAD: pthread operation failure"; + + return true; +} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/PerftoolError.h b/cpp/src/tests/storePerfTools/common/PerftoolError.h new file mode 100644 index 0000000000..8f994d7d28 --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/PerftoolError.h @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerftoolError.h + */ + +#ifndef tests_storePerfTools_common_PerftoolError_h_ +#define tests_storePerfTools_common_PerftoolError_h_ + +#include "Streamable.h" + +#include <map> +#include <stdexcept> // std::runtime_error +#include <stdint.h> // uint32_t + +// Macro definitions + +#include <cstring> // std::strerror() +#include <sstream> // std::ostringstream + +/** + * \brief Macro to retrieve and format the C errno value as a string. + * + * \param errno Value of errno to be formatted. + */ +#define FORMAT_SYSERR(errno) " errno=" << errno << " (" << std::strerror(errno) << ")" + +/** + * \brief Macro to check for a clean pthread creation and throwing a JournalException with code JERR_PTHREAD if + * thread creation failed. + * + * \param err Value or errno. + * \param pfn Name of system call that failed. + * \param cls Name of class in which function failed. + * \param fn Name of class function that failed. + */ +#define PTHREAD_CHK(err, pfn, cls, fn) if(err != 0) { \ + std::ostringstream oss; \ + oss << pfn << " failed: " << FORMAT_SYSERR(err); \ + throw tests::storePerftools::common::PerftoolError(tests::storePerftools::common::PerftoolError::PERR_PTHREAD, oss.str(), cls, fn); \ + } + +namespace tests { +namespace storePerftools { +namespace common { + +class PerftoolError: public std::runtime_error, public Streamable +{ +public: + // --- Constructors & destructors --- + PerftoolError(const uint32_t errCode) throw (); + PerftoolError(const std::string& errMsg) throw (); + PerftoolError(const uint32_t errCode, + const std::string& errMsg) throw (); + PerftoolError(const uint32_t errCode, + const std::string& throwingClass, + const std::string& throwingFunction) throw (); + PerftoolError(const std::string& errMsg, + const std::string& throwingClass, + const std::string& throwingFunction) throw (); + PerftoolError(const uint32_t errCode, + const std::string& errMsg, + const std::string& throwingClass, + const std::string& throwingFunction) throw (); + virtual ~PerftoolError() throw(); + + const char* what() const throw (); // overrides std::runtime_error::what() + uint32_t getErrorCode() const throw (); + const std::string getAdditionalInfo() const throw (); + const std::string getThrowingClass() const throw (); + const std::string getThrowingFunction() const throw (); + + // --- Implementation of class Streamable --- + virtual void toStream(std::ostream& os = std::cout) const; + + // --- Generic and system errors --- + static const uint32_t PERR_PTHREAD; ///< pthread operation failure + + + static const char* s_errorMessage(const uint32_t err_no) throw (); + + +protected: + uint32_t m_errCode; ///< Error or failure code, taken from JournalErrors. + std::string m_errMsg; ///< Additional information pertaining to the error or failure. + std::string m_throwingClass; ///< Name of the class throwing the error. + std::string m_throwingFunction; ///< Name of the function throwing the error. + std::string m_what; ///< Standard error of failure message, taken from JournalErrors. + + void formatWhatStr() throw (); + virtual const char* className(); + + typedef std::map<uint32_t, const char*> errorMap_t; ///< Type for map of error messages + typedef errorMap_t::const_iterator errorMapCitr_t; ///< Const iterator for map of error messages + + static errorMap_t s_errorMap; ///< Map of error messages + static errorMapCitr_t s_errorMapIterator; ///< Const iterator + +private: + static const char* s_className; ///< Name of this class, used in formatting error messages. + static bool s_initializedFlag; ///< Dummy flag, used to initialize map. + + PerftoolError(); + static bool s_initialize(); ///< Static fn for initializing static data + +}; + +}}} // namespace tests::stprePerftools::common + +#endif // tests_storePerfTools_common_PerftoolError_h_ diff --git a/cpp/src/tests/storePerfTools/common/ScopedTimable.cpp b/cpp/src/tests/storePerfTools/common/ScopedTimable.cpp new file mode 100644 index 0000000000..c2023b7854 --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/ScopedTimable.cpp @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ScopedTimable.cpp + */ + +#include "ScopedTimable.h" + +namespace tests { +namespace storePerftools { +namespace common { + +ScopedTimable::ScopedTimable() : + m_elapsed(0.0) +{} + +ScopedTimable::~ScopedTimable() +{} + +double& +ScopedTimable::getElapsedRef() +{ + return m_elapsed; +} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/ScopedTimable.h b/cpp/src/tests/storePerfTools/common/ScopedTimable.h new file mode 100644 index 0000000000..3c21a4aafa --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/ScopedTimable.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ScopedTimable.h + */ + +#ifndef tests_storePerfTools_common_ScopedTimable_h_ +#define tests_storePerfTools_common_ScopedTimable_h_ + +namespace tests { +namespace storePerftools { +namespace common { + +/** + * \brief Scoped timer class that starts timing on construction and finishes on destruction. + * + * This class is designed to be the parent class for a performance result class which depends on the elapsed + * time of some process or event. By passing this (or its subclasses) to ScopedTimer (which only exists within + * the scope of the event), the _elapsed member of this class will be written with the elapsed time when the + * ScopedTimer object goes out of scope or is destroyed. + * + * Subclasses may be aware of the parameters being timed, and may thus print and/or display performance and/or + * rate information for these parameters. + */ +class ScopedTimable +{ +public: + ScopedTimable(); + virtual ~ScopedTimable(); + double& getElapsedRef(); + +protected: + double m_elapsed; ///< Elapsed time, will be written on destruction of ScopedTimer instances + +}; + +}}} // namespace tests::storePerftools::common + +#endif // tests_storePerfTools_common_ScopedTimable_h_ diff --git a/cpp/src/tests/storePerfTools/common/ScopedTimer.cpp b/cpp/src/tests/storePerfTools/common/ScopedTimer.cpp new file mode 100644 index 0000000000..8312174cad --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/ScopedTimer.cpp @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ScopedTimer.cpp + */ + +#include "ScopedTimer.h" + +#include "ScopedTimable.h" + +namespace tests { +namespace storePerftools { +namespace common { + +ScopedTimer::ScopedTimer(double& elapsed) : + m_elapsed(elapsed) +{ + ::clock_gettime(CLOCK_REALTIME, &m_startTime); +} + +ScopedTimer::ScopedTimer(ScopedTimable& st) : + m_elapsed(st.getElapsedRef()) +{ + ::clock_gettime(CLOCK_REALTIME, &m_startTime); +} + +ScopedTimer::~ScopedTimer() +{ + ::timespec stopTime; + ::clock_gettime(CLOCK_REALTIME, &stopTime); + m_elapsed = _s_getDoubleTime(stopTime) - _s_getDoubleTime(m_startTime); +} + +// static +double ScopedTimer::_s_getDoubleTime(const ::timespec& ts) +{ + return ts.tv_sec + (double(ts.tv_nsec) / 1e9); +} + + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/ScopedTimer.h b/cpp/src/tests/storePerfTools/common/ScopedTimer.h new file mode 100644 index 0000000000..c23d056f10 --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/ScopedTimer.h @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file ScopedTimer.h + */ + +#ifndef tests_storePerfTools_common_ScopedTimer_h_ +#define tests_storePerfTools_common_ScopedTimer_h_ + +#include <ctime> + +namespace tests { +namespace storePerftools { +namespace common { + +class ScopedTimable; + +/** + * \brief Scoped timer class that starts timing on construction and finishes on destruction. + * + * The scoped timer will take the current time on construction and again on destruction. The destructor + * will calculate the elapsed time from the difference between these two times and write the result + * as a double to the double ref supplied to the constructor. A second constructor will accept a class (or + * subclass) of ScopedTimable, which contains a double to which the result may be written and accessed at a + * later time. + */ +class ScopedTimer +{ +public: + /** + * \brief Constructor + * + * Constructor which accepts a ref to a double. Will start the time interval measurement. + * + * \param elapsed A ref to a double which will contain the elapsed time in seconds after this class instance + * is destroyed. + */ + ScopedTimer(double& elapsed); + + /** + * \brief Constructor + * + * Constructor which accepts a ref to a ScopedTimable. Will start the time interval measurement. + * + * \param st A ref to a ScopedTimable into which the result of the ScopedTimer can be written. + */ + ScopedTimer(ScopedTimable& st); + + /** + * \brief Destructor + * + * Destructor. Will stop the time interval measurement and write the calculated elapsed time into _elapsed. + */ + virtual ~ScopedTimer(); + +protected: + double& m_elapsed; ///< Ref to elapsed time, will be written on destruction of ScopedTimer instances + ::timespec m_startTime; ///< Start time, set on construction + + /** + * \brief Convert ::timespec to seconds + * + * Static function to convert a ::timespec struct into a double representation in seconds. + * + * \param ts std::timespec struct containing the time to be converted. + * \return A double which represents the time in parameter ts in seconds. + */ + static double _s_getDoubleTime(const ::timespec& ts); + +}; + +}}} // namespace tests::storePerftools::common + +#endif // tests_storePerfTools_common_ScopedTimer_h_ diff --git a/cpp/src/tests/storePerfTools/common/Streamable.cpp b/cpp/src/tests/storePerfTools/common/Streamable.cpp new file mode 100644 index 0000000000..8c58f1c03e --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/Streamable.cpp @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Streamable.cpp + */ + +#include "Streamable.h" + +#include <sstream> + +namespace tests { +namespace storePerftools { +namespace common { + +std::string +Streamable::toString() const +{ + std::ostringstream oss; + toStream(oss); + return oss.str(); +} + +std::ostream& +operator<<(std::ostream& os, const Streamable& s) +{ + s.toStream(os); + return os; +} + +std::ostream& +operator<<(std::ostream& os, const Streamable* sPtr) +{ + sPtr->toStream(os); + return os; +} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/Streamable.h b/cpp/src/tests/storePerfTools/common/Streamable.h new file mode 100644 index 0000000000..504a3d97dd --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/Streamable.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Streamable.h + */ + +#ifndef tests_storePerfTools_common_Streamable_h_ +#define tests_storePerfTools_common_Streamable_h_ + +#include <iostream> + +namespace tests { +namespace storePerftools { +namespace common { + +/** + * \brief Abstract class which provides the mechanisms to stream + * + * An abstract class which provides stream functions. The toStream() function must be implemented by subclasses, + * and is used by the remaining functions. For convenience, toString() returns a std::string object. + */ +class Streamable +{ +public: + /** + * \brief Virtual destructor + */ + virtual ~Streamable() {} + + /*** + * \brief Stream some representation of the object to an output stream + * + * \param os Output stream to which the class data is to be streamed + */ + virtual void toStream(std::ostream& os = std::cout) const = 0; + + /** + * \brief Creates a string representation of the test parameters + * + * Convenience feature which creates and returns a std::string object containing the content of toStream(). + * + * \return Content of toStream() + */ + std::string toString() const; + + /** + * \brief Stream the object to an output stream + */ + friend std::ostream& operator<<(std::ostream& os, + const Streamable& s); + + /** + * \brief Stream the object to an output stream through an object pointer + */ + friend std::ostream& operator<<(std::ostream& os, + const Streamable* sPtr); + +}; + +}}} // namespace tests::storePerftools::common + +#endif // tests_storePerfTools_common_Streamable_h_ diff --git a/cpp/src/tests/storePerfTools/common/TestOptions.cpp b/cpp/src/tests/storePerfTools/common/TestOptions.cpp new file mode 100644 index 0000000000..39e3434a6c --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/TestOptions.cpp @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestOptions.cpp + */ + +#include "TestOptions.h" + +namespace tests { +namespace storePerftools { +namespace common { + +// static declarations +uint32_t TestOptions::s_defaultNumMsgs = 1024; +uint32_t TestOptions::s_defaultMsgSize = 1024; +uint16_t TestOptions::s_defaultNumQueues = 1; +uint16_t TestOptions::s_defaultEnqThreadsPerQueue = 1; +uint16_t TestOptions::s_defaultDeqThreadsPerQueue = 1; + +TestOptions::TestOptions(const std::string& name) : + qpid::Options(name), + m_numMsgs(s_defaultNumMsgs), + m_msgSize(s_defaultMsgSize), + m_numQueues(s_defaultNumQueues), + m_numEnqThreadsPerQueue(s_defaultEnqThreadsPerQueue), + m_numDeqThreadsPerQueue(s_defaultDeqThreadsPerQueue) +{ + doAddOptions(); +} + +TestOptions::TestOptions(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue, + const std::string& name) : + qpid::Options(name), + m_numMsgs(numMsgs), + m_msgSize(msgSize), + m_numQueues(numQueues), + m_numEnqThreadsPerQueue(numEnqThreadsPerQueue), + m_numDeqThreadsPerQueue(numDeqThreadsPerQueue) +{ + doAddOptions(); +} + +TestOptions::~TestOptions() +{} + +void +TestOptions::printVals(std::ostream& os) const +{ + os << "TEST OPTIONS:" << std::endl; + os << " Number of queues [-q, --num-queues]: " << m_numQueues << std::endl; + os << " Number of producers per queue [-p, --num-producers]: " << m_numEnqThreadsPerQueue << std::endl; + os << " Number of consumers per queue [-c, --num-consumers]: " << m_numDeqThreadsPerQueue << std::endl; + os << " Number of messages to send per producer [-m, --num-msgs]: " << m_numMsgs << std::endl; + os << " Size of each message (bytes) [-s, --msg-size]: " << m_msgSize << std::endl; +} + +void +TestOptions::validate() +{ + if (((m_numEnqThreadsPerQueue * m_numMsgs) % m_numDeqThreadsPerQueue) != 0) { + throw qpid::Exception("Parameter Error: (num-producers * num-msgs) must be a multiple of num-consumers."); + } +} + +void +TestOptions::doAddOptions() +{ + addOptions() + ("num-queues,q", qpid::optValue(m_numQueues, "N"), + "Number of queues") + ("num-producers,p", qpid::optValue(m_numEnqThreadsPerQueue, "N"), + "Number of producers per queue") + ("num-consumers,c", qpid::optValue(m_numDeqThreadsPerQueue, "N"), + "Number of consumers per queue") + ("num-msgs,m", qpid::optValue(m_numMsgs, "N"), + "Number of messages to send per producer") + ("msg-size,s", qpid::optValue(m_msgSize, "N"), + "Size of each message (bytes)") + ; +} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/TestOptions.h b/cpp/src/tests/storePerfTools/common/TestOptions.h new file mode 100644 index 0000000000..6a4479fa56 --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/TestOptions.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestOptions.h + */ + +#ifndef tests_storePerfTools_common_TestOptions_h_ +#define tests_storePerfTools_common_TestOptions_h_ + +#include "qpid/Options.h" + +namespace tests { +namespace storePerftools { +namespace common { + +class TestOptions : public qpid::Options +{ +public: + TestOptions(const std::string& name="Test Options"); + TestOptions(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue, + const std::string& name="Test Options"); + virtual ~TestOptions(); + void printVals(std::ostream& os) const; + void validate(); + + uint32_t m_numMsgs; ///< Number of messages to be sent + uint32_t m_msgSize; ///< Message size in bytes + uint16_t m_numQueues; ///< Number of queues to test simultaneously + uint16_t m_numEnqThreadsPerQueue; ///< Number of enqueue threads per queue + uint16_t m_numDeqThreadsPerQueue; ///< Number of dequeue threads per queue + +protected: + static uint32_t s_defaultNumMsgs; ///< Default number of messages to be sent + static uint32_t s_defaultMsgSize; ///< Default message size in bytes + static uint16_t s_defaultNumQueues; ///< Default number of queues to test simultaneously + static uint16_t s_defaultEnqThreadsPerQueue; ///< Default number of enqueue threads per queue + static uint16_t s_defaultDeqThreadsPerQueue; ///< Default number of dequeue threads per queue + + void doAddOptions(); + +}; + +}}} // namespace tests::storePerftools::common + +#endif // tests_storePerfTools_common_TestOptions_h_ diff --git a/cpp/src/tests/storePerfTools/common/TestParameters.cpp b/cpp/src/tests/storePerfTools/common/TestParameters.cpp new file mode 100644 index 0000000000..f36a2d3bda --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/TestParameters.cpp @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestParameters.cpp + */ + +#include "TestParameters.h" + +#include <cstdlib> // std::atoi, std::atol + +namespace tests { +namespace storePerftools { +namespace common { + +// static declarations +uint32_t TestParameters::s_defaultNumMsgs = 1024; +uint32_t TestParameters::s_defaultMsgSize = 1024; +uint16_t TestParameters::s_defaultNumQueues = 1; +uint16_t TestParameters::s_defaultEnqThreadsPerQueue = 1; +uint16_t TestParameters::s_defaultDeqThreadsPerQueue = 1; + +TestParameters::TestParameters(): + Parameters(), + m_numMsgs(s_defaultNumMsgs), + m_msgSize(s_defaultMsgSize), + m_numQueues(s_defaultNumQueues), + m_numEnqThreadsPerQueue(s_defaultEnqThreadsPerQueue), + m_numDeqThreadsPerQueue(s_defaultDeqThreadsPerQueue)//, +{} + +TestParameters::TestParameters(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue) : + Parameters(), + m_numMsgs(numMsgs), + m_msgSize(msgSize), + m_numQueues(numQueues), + m_numEnqThreadsPerQueue(numEnqThreadsPerQueue), + m_numDeqThreadsPerQueue(numDeqThreadsPerQueue) +{} + +TestParameters::TestParameters(const TestParameters& tp): + Parameters(), + m_numMsgs(tp.m_numMsgs), + m_msgSize(tp.m_msgSize), + m_numQueues(tp.m_numQueues), + m_numEnqThreadsPerQueue(tp.m_numEnqThreadsPerQueue), + m_numDeqThreadsPerQueue(tp.m_numDeqThreadsPerQueue) +{} + +TestParameters::~TestParameters() +{} + +void +TestParameters::toStream(std::ostream& os) const +{ + os << "Test Parameters:" << std::endl; + os << " num_msgs = " << m_numMsgs << std::endl; + os << " msg_size = " << m_msgSize << std::endl; + os << " num_queues = " << m_numQueues << std::endl; + os << " num_enq_threads_per_queue = " << m_numEnqThreadsPerQueue << std::endl; + os << " num_deq_threads_per_queue = " << m_numDeqThreadsPerQueue << std::endl; +} + +bool +TestParameters::parseArg(const int arg, + const char* optarg) +{ + switch(arg) { + case 'm': + m_numMsgs = uint32_t(std::atol(optarg)); + break; + case 'S': + m_msgSize = uint32_t(std::atol(optarg)); + break; + case 'q': + m_numQueues = uint16_t(std::atoi(optarg)); + break; + case 'e': + m_numEnqThreadsPerQueue = uint16_t(std::atoi(optarg)); + break; + case 'd': + m_numDeqThreadsPerQueue = uint16_t(std::atoi(optarg)); + break; + default: + return false; + } + return true; +} + +// static +void +TestParameters::printArgs(std::ostream& os) +{ + os << "Test parameters:" << std::endl; + os << " -m --num_msgs: Number of messages to send per enqueue thread [" + << TestParameters::s_defaultNumMsgs << "]" << std::endl; + os << " -S --msg_size: Size of each message to be sent [" + << TestParameters::s_defaultMsgSize << "]" << std::endl; + os << " -q --num_queues: Number of simultaneous queues [" + << TestParameters::s_defaultNumQueues << "]" << std::endl; + os << " -e --num_enq_threads_per_queue: Number of enqueue threads per queue [" + << TestParameters::s_defaultEnqThreadsPerQueue << "]" << std::endl; + os << " -d --num_deq_threads_per_queue: Number of dequeue threads per queue [" + << TestParameters::s_defaultDeqThreadsPerQueue << "]" << std::endl; + os << std::endl; +} + +// static +std::string +TestParameters::shortArgs() +{ + return "m:S:q:e:d:"; +} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/TestParameters.h b/cpp/src/tests/storePerfTools/common/TestParameters.h new file mode 100644 index 0000000000..ea73589609 --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/TestParameters.h @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestParameters.h + */ + +#ifndef tests_storePerfTools_common_TestParameters_h_ +#define tests_storePerfTools_common_TestParameters_h_ + +#include "Parameters.h" + +#include <stdint.h> // uint16_t, uint32_t + +namespace tests { +namespace storePerftools { +namespace common { + +class TestOptions; + +/** + * \brief Struct for aggregating the test parameters + * + * This struct is used to aggregate and keep together all the test parameters. These affect the test itself, the + * journal geometry is aggregated in class JrnlParameters. + */ +class TestParameters : public Parameters +{ +public: + static uint32_t s_defaultNumMsgs; ///< Default number of messages to be sent + static uint32_t s_defaultMsgSize; ///< Default message size in bytes + static uint16_t s_defaultNumQueues; ///< Default number of queues to test simultaneously + static uint16_t s_defaultEnqThreadsPerQueue; ///< Default number of enqueue threads per queue + static uint16_t s_defaultDeqThreadsPerQueue; ///< Default number of dequeue threads per queue + + uint32_t m_numMsgs; ///< Number of messages to be sent + uint32_t m_msgSize; ///< Message size in bytes + uint16_t m_numQueues; ///< Number of queues to test simultaneously + uint16_t m_numEnqThreadsPerQueue; ///< Number of enqueue threads per queue + uint16_t m_numDeqThreadsPerQueue; ///< Number of dequeue threads per queue + + /** + * \brief Defaault constructor + * + * Default constructor. Uses the default values for all parameters. + */ + TestParameters(); + + /** + * \brief Constructor + * + * Convenience constructor. + * + * \param numMsgs Number of messages to be sent + * \param msgSize Message size in bytes + * \param numQueues Number of queues to test simultaneously + * \param numEnqThreadsPerQueue Number of enqueue threads per queue + * \param numDeqThreadsPerQueue Number of dequeue threads per queue + */ + TestParameters(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue); + + /** + * \brief Copy constructor + * + * \param tp Reference to JrnlPerfTestParameters instance to be copied + */ + TestParameters(const TestParameters& tp); + + /** + * \brief Virtual destructor + */ + virtual ~TestParameters(); + + virtual bool parseArg(const int arg, + const char* optarg); + + static void printArgs(std::ostream& os); + + static std::string shortArgs(); + + /*** + * \brief Stream the test parameters to an output stream + * + * Convenience feature which streams a multi-line representation of all the test parameters, one per line to an + * output stream. + * + * \param os Output stream to which the class data is to be streamed + */ + void toStream(std::ostream& os = std::cout) const; + +}; + +}}} // namespace tests::storePerftools::common + +#endif // tests_storePerfTools_common_TestParameters_h_ diff --git a/cpp/src/tests/storePerfTools/common/TestResult.cpp b/cpp/src/tests/storePerfTools/common/TestResult.cpp new file mode 100644 index 0000000000..c3e9d27dfc --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/TestResult.cpp @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.cpp + */ + +#include "TestResult.h" + +namespace tests { +namespace storePerftools { +namespace common { + +TestResult::TestResult() : + ScopedTimable(), + Streamable() +{} + +TestResult::~TestResult() +{} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/TestResult.h b/cpp/src/tests/storePerfTools/common/TestResult.h new file mode 100644 index 0000000000..e2217286b4 --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/TestResult.h @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.h + */ + +#ifndef tests_storePerfTools_common_TestResult_h_ +#define tests_storePerfTools_common_TestResult_h_ + +#include "ScopedTimable.h" +#include "Streamable.h" + +namespace tests { +namespace storePerftools { +namespace common { + +class TestResult : public ScopedTimable, public Streamable +{ +public: + TestResult(); + virtual ~TestResult(); + void toStream(std::ostream& os = std::cout) const = 0; +}; + +}}} // namespace tests:storePerftools::common + +#endif // tests_storePerfTools_common_TestResult_h_ diff --git a/cpp/src/tests/storePerfTools/common/Thread.cpp b/cpp/src/tests/storePerfTools/common/Thread.cpp new file mode 100644 index 0000000000..188e102e8f --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/Thread.cpp @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Thread.cpp + */ + +#include "Thread.h" + +#include "PerftoolError.h" + +namespace tests { +namespace storePerftools { +namespace common { + +Thread::Thread(startFn_t sf, + void* p) : + m_running(true) +{ + PTHREAD_CHK(::pthread_create(&m_thread, NULL, sf, p), "::pthread_create", "Thread", "Thread"); +} + +Thread::Thread(Thread::startFn_t sf, + void* p, + const std::string& id) : + m_id(id), + m_running(true) +{ + PTHREAD_CHK(::pthread_create(&m_thread, NULL, sf, p), "::pthread_create", "Thread", "Thread"); +} + +Thread::~Thread() +{ + if (m_running) { + PTHREAD_CHK(::pthread_detach(m_thread), "pthread_detach", "~Thread", "Thread"); + } +} + +const std::string& +Thread::getId() const +{ + return m_id; +} + +void Thread::join() +{ + PTHREAD_CHK(::pthread_join(m_thread, NULL), "pthread_join", "join", "Thread"); + m_running = false; +} + +}}} // namespace tests::storePerftools::common diff --git a/cpp/src/tests/storePerfTools/common/Thread.h b/cpp/src/tests/storePerfTools/common/Thread.h new file mode 100644 index 0000000000..505d038162 --- /dev/null +++ b/cpp/src/tests/storePerfTools/common/Thread.h @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Thread.h + */ + +#ifndef tests_storePerfTools_common_Thread_h_ +#define tests_storePerfTools_common_Thread_h_ + +#include <pthread.h> +#include <string> + +namespace tests { +namespace storePerftools { +namespace common { + +/** + * \brief Ultra-simple pthread class. + */ +class Thread { +public: + typedef void*(*startFn_t)(void*); ///< Thread entry point function pointer type + + /** + * \brief Constructor + * \param sf Pointer to thread entry function + * \param p Void pointer to parameter of start function + */ + Thread(startFn_t sf, + void* p); + + /** + * \brief Constructor + * \param sf Pointer to thread entry function + * \param p Void pointer to parameter of start function + * \param id Name of this thread instance + */ + Thread(startFn_t sf, + void* p, + const std::string& id); + + /** + * \brief Destructor + */ + virtual ~Thread(); + + /** + * \brief Get the name of this thread. + * \return Name as supplied to the constructor. + */ + const std::string& getId() const; + + /** + * \brief Wait for this thread instance to finish running startFn(). + */ + void join(); + +private: + ::pthread_t m_thread; ///< Internal posix thread + std::string m_id; ///< Identifier for this thread instance + bool m_running; ///< \b true is the thread is active and running, \b false when not yet started or joined. + +}; + +}}} // namespace tests::storePerftools::common + +#endif // tests_storePerfTools_common_Thread_h_ diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp b/cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp new file mode 100644 index 0000000000..6efdc06fc8 --- /dev/null +++ b/cpp/src/tests/storePerfTools/jrnlPerf/Journal.cpp @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Journal.cpp + */ + +#include "Journal.h" + +#ifdef JOURNAL2 +# include "qpid/asyncStore/jrnl2/DataToken.h" + +# define X_JRNL_FN_DEQUEUE(dtok) dequeue(dtok, 0, 0) +# define X_JRNL_FN_ENQUEUE(dtok, msgData, msgSize) enqueue(dtok, msgData, msgSize, 0, 0, false) +# define X_JRNL_FN_FLUSH(jrnlPtr) { jrnlPtr->flush(); jrnlPtr->sync(); } +# define X_JRNL_FN_GETDTOKSTATUS(dtok) dtok +# define X_JRNL_FN_GETEVENTS(timeout) processCompletedAioWriteEvents(timeout) +# define X_JRNL_FN_GETIOSTR(iores) qpid::asyncStore::jrnl2::g_ioResAsString(iores) +# define X_JRNL_IO_OP_RES qpid::asyncStore::jrnl2::jrnlOpRes +# define X_JRNL_IO_OP_RES_BUSY qpid::asyncStore::jrnl2::RHM_IORES_BUSY +# define X_JRNL_IO_OP_RES_ENQCAPTHRESH qpid::asyncStore::jrnl2::RHM_IORES_ENQCAPTHRESH +# define X_JRNL_IO_OP_RES_SUCCESS 0 +# define X_SCOPED_LOCK qpid::asyncStore::jrnl2::ScopedLock +#else +# include "jrnl/jcntl.hpp" +# include "jrnl/data_tok.hpp" + +# define X_JRNL_FN_DEQUEUE(dtok) dequeue_data_record(dtok) +# define X_JRNL_FN_ENQUEUE(dtok, msgData, msgSize) enqueue_data_record(msgData, msgSize, msgSize, dtok); +# define X_JRNL_FN_FLUSH(jrnlPtr) jrnlPtr->flush(true) +# define X_JRNL_FN_GETDTOKSTATUS(dtok) dtok->status_str() +# define X_JRNL_FN_GETEVENTS(timeout) get_wr_events(timeout) +# define X_JRNL_FN_GETIOSTR(iores) mrg::journal::iores_str(iores) +# define X_JRNL_IO_OP_RES mrg::journal::iores +# define X_JRNL_IO_OP_RES_BUSY mrg::journal::RHM_IORES_BUSY +# define X_JRNL_IO_OP_RES_ENQCAPTHRESH mrg::journal::RHM_IORES_ENQCAPTHRESH +# define X_JRNL_IO_OP_RES_SUCCESS mrg::journal::RHM_IORES_SUCCESS +# define X_SCOPED_LOCK mrg::journal::slock +#endif + +#include <iostream> + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +Journal::Journal(const uint32_t numMsgs, + const uint32_t msgSize, + const char* msgData, + X_ASYNC_JOURNAL* const jrnlPtr) : + m_numMsgs(numMsgs), + m_msgSize(msgSize), + m_msgData(msgData), + m_jrnlPtr(jrnlPtr) +{} + +Journal::~Journal() +{ + delete m_jrnlPtr; +} + + +// *** MUST BE THREAD-SAFE **** +// This method will be called by multiple threads simultaneously +// Enqueue thread entry point +void* +Journal::runEnqueues() +{ + bool misfireFlag = false; + uint32_t i = 0; + while (i < m_numMsgs) { + X_DATA_TOKEN* mtokPtr = new X_DATA_TOKEN(); + X_JRNL_IO_OP_RES jrnlIoRes = m_jrnlPtr->X_JRNL_FN_ENQUEUE(mtokPtr, m_msgData, m_msgSize); + switch (jrnlIoRes) { + case X_JRNL_IO_OP_RES_SUCCESS: + i++; + misfireFlag = false; + break; + case X_JRNL_IO_OP_RES_BUSY: + if (!misfireFlag) { + std::cout << "-" << std::flush; + } + delete mtokPtr; + misfireFlag = true; + break; + case X_JRNL_IO_OP_RES_ENQCAPTHRESH: + if (!misfireFlag) { + std::cout << "*" << std::flush; + } + delete mtokPtr; + misfireFlag = true; + ::usleep(10); + break; + default: + delete mtokPtr; + std::cerr << "enqueue_data_record FAILED with " << X_JRNL_FN_GETIOSTR(jrnlIoRes) << std::endl; + } + } + /// \todo handle these results + X_JRNL_FN_FLUSH(m_jrnlPtr); + return NULL; +} + + +// *** MUST BE THREAD-SAFE **** +// This method will be called by multiple threads simultaneously +// Dequeue thread entry point +void* +Journal::runDequeues() +{ + uint32_t i = 0; + X_JRNL_IO_OP_RES jrnlIoRes; + while (i < m_numMsgs) { + X_DATA_TOKEN* mtokPtr = 0; + while (!mtokPtr) { + bool procAioEventsFlag; + { // --- START OF CRITICAL SECTION --- + X_SCOPED_LOCK l(m_unprocCallbacksMutex); + procAioEventsFlag = m_unprocCallbacks.empty(); + if (!procAioEventsFlag) { + mtokPtr = m_unprocCallbacks.back(); + m_unprocCallbacks.pop_back(); + } + } // --- END OF CRITICAL SECTION --- + if (procAioEventsFlag) { + m_jrnlPtr->X_JRNL_FN_GETEVENTS(0); + ::usleep(1); + } + } + bool done = false; + while (!done) { + jrnlIoRes = m_jrnlPtr->X_JRNL_FN_DEQUEUE(mtokPtr); + switch (jrnlIoRes) { + case X_JRNL_IO_OP_RES_SUCCESS: + i ++; + done = true; + break; + case X_JRNL_IO_OP_RES_BUSY: + //::usleep(10); + break; + default: + std::cerr << "dequeue_data_record FAILED with " << X_JRNL_FN_GETIOSTR(jrnlIoRes) << ": " + << X_JRNL_FN_GETDTOKSTATUS(mtokPtr) << std::endl; + delete mtokPtr; + done = true; + } + } + m_jrnlPtr->X_JRNL_FN_GETEVENTS(0); + } + /// \todo handle these results + X_JRNL_FN_FLUSH(m_jrnlPtr); + return NULL; +} + +//static +void* +Journal::startEnqueues(void* ptr) +{ + return reinterpret_cast<Journal*>(ptr)->runEnqueues(); +} + +//static +void* +Journal:: startDequeues(void* ptr) +{ + return reinterpret_cast<Journal*>(ptr)->runDequeues(); +} + +// *** MUST BE THREAD-SAFE **** +// This method will be called by multiple threads simultaneously +void +Journal::X_AIO_WR_CALLBACK(std::vector<X_DATA_TOKEN*>& msgTokenList) +{ + X_DATA_TOKEN* mtokPtr; + while (msgTokenList.size()) { + mtokPtr = msgTokenList.back(); + msgTokenList.pop_back(); +#ifdef JOURNAL2 + switch (mtokPtr->getDataOpState().get()) { + case qpid::asyncStore::jrnl2::OP_ENQUEUE: +#else + switch (mtokPtr->wstate()) { + case X_DATA_TOKEN::ENQ: +#endif + { // --- START OF CRITICAL SECTION --- + X_SCOPED_LOCK l(m_unprocCallbacksMutex); + m_unprocCallbacks.push_back(mtokPtr); + } // --- END OF CRITICAL SECTION --- + break; + default: + delete mtokPtr; + } + } +} + +// *** MUST BE THREAD-SAFE **** +// This method will be called by multiple threads simultaneously +void +Journal::X_AIO_RD_CALLBACK(std::vector<uint16_t>& /*buffPageCtrlBlkIndexList*/) +{} + +}}} // namespace tests::storePerftools::jrnlPerf diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/Journal.h b/cpp/src/tests/storePerfTools/jrnlPerf/Journal.h new file mode 100644 index 0000000000..19d0980b53 --- /dev/null +++ b/cpp/src/tests/storePerfTools/jrnlPerf/Journal.h @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Journal.h + */ + +#ifndef tests_storePerfTools_jrnlPerf_Journal_h_ +#define tests_storePerfTools_jrnlPerf_Journal_h_ + +#ifdef JOURNAL2 +# include "qpid/asyncStore/jrnl2/AioCallback.h" +# include "qpid/asyncStore/jrnl2/AsyncJournal.h" +# include "qpid/asyncStore/jrnl2/ScopedLock.h" +#else +# include "jrnl/aio_callback.hpp" +# include "jrnl/smutex.hpp" +#endif + +#include <stdint.h> // uint16_t, uint32_t + +#ifdef JOURNAL2 +# define X_AIO_CALLBACK qpid::asyncStore::jrnl2::AioCallback +# define X_AIO_RD_CALLBACK readAioCompleteCallback +# define X_AIO_WR_CALLBACK writeAioCompleteCallback +# define X_ASYNC_JOURNAL qpid::asyncStore::jrnl2::AsyncJournal +# define X_DATA_TOKEN qpid::asyncStore::jrnl2::DataToken +# define X_SCOPED_MUTEX qpid::asyncStore::jrnl2::ScopedMutex +#else +# define X_AIO_CALLBACK mrg::journal::aio_callback +# define X_AIO_RD_CALLBACK rd_aio_cb +# define X_AIO_WR_CALLBACK wr_aio_cb +# define X_ASYNC_JOURNAL mrg::journal::jcntl +# define X_DATA_TOKEN mrg::journal::data_tok +# define X_SCOPED_MUTEX mrg::journal::smutex +#endif + +#ifndef JOURNAL2 +namespace mrg { +namespace journal { +class jcntl; +}} // namespace mrg::journal +namespace qpid { +namespace asyncStore { +namespace jrnl2 { +class AsyncJournal; +}}} // namespace qpid::asyncStore::jrnl2 +#endif + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +/** + * \brief Test journal instance. Each queue to be tested results in one instance of this class. + * + * Journal test harness which contains the journal to be tested. Each queue to be tested in the test parameters + * results in one instance of this class being instantiated, and consequently one set of journals on disk. The + * journal instance is provided as a pointer to the constructor. + */ +class Journal : public X_AIO_CALLBACK +{ +public: + /** + * \brief Constructor + * + * \param numMsgs Number of messages per thread to be enqueued then dequeued (ie both ways through broker) + * \param msgSize Size of each message being enqueued + * \param msgData Pointer to message content (all messages have identical content) + * \param jrnlPtr Pinter to journal instance which is to be tested + */ + Journal(const uint32_t numMsgs, + const uint32_t msgSize, + const char* msgData, + X_ASYNC_JOURNAL* const jrnlPtr); + + /** + * \brief virtual destructor + */ + virtual ~Journal(); + + /** + * \brief Worker thread enqueue task + * + * This function is the worker thread enqueue task entry point. It enqueues _numMsgs onto the journal instance. + * A data tokens is created for each record, this is the start of the data token life cycle. All possible + * returns from the journal are handled appropriately. Since the enqueue threads also perform + * callbacks on completed AIO operations, the data tokens from completed enqueues are placed onto the + * unprocessed callback list (_unprocCallbackList) for dequeueing by the dequeue worker thread(s). + * + * This function must be thread safe. + */ + void* runEnqueues(); + + /** + * \brief Worker thread dequeue task + * + * This function is the worker thread dequeue task entry point. It dequeues messages which are on the + * unprocessed callback list (_unprocCallbackList). + * + * This function must be thread safe. + */ + void* runDequeues(); + + /** + * \brief Helper function to launch the run() function when starting a thread. + */ + static void* startEnqueues(void* ptr); + + /** + * \brief Helper function to launch the run() function when starting a thread. + */ + static void* startDequeues(void* ptr); + + /** + * \brief Write callback function. When AIO operations return, this function is called. + * + * When AIO operations return, this function will sort the enqueue ops from the rest and place the data tokens + * of these records onto the unprocessed callback list (_unprocCallbackList) for dequeueing by another thread. + * + * Returning dequeue ops have their data tokens destroyed, as this is the end of the life cycle of the data + * tokens. + * + * Required by all subclasses of mrg::journal::aio_callback. + * + * \param dataTokenList A vector of data tokens for those messages which have completed their AIO write + * operations + */ + void X_AIO_WR_CALLBACK(std::vector<X_DATA_TOKEN*>& dataTokenList); + + /** + * \brief Read callback function. When read AIO operations return, this function is called. + * + * Not used in this test, but required by all subclasses of mrg::journal::aio_callback. + * + * \param buffPageCtrlBlkIndexList A vector of indices to the buffer page control blocks for completed reads + */ + void X_AIO_RD_CALLBACK(std::vector<uint16_t>& buffPageCtrlBlkIndexList); + +protected: + const uint32_t m_numMsgs; ///< Number of messages to be processed by this journal instance + const uint32_t m_msgSize; ///< Size of each message (in bytes) + const char* m_msgData; ///< Pointer to message content to be used for each message. + X_ASYNC_JOURNAL* const m_jrnlPtr; ///< Journal instance pointer + std::vector<X_DATA_TOKEN*> m_unprocCallbacks; ///< List of unprocessed callbacks to be dequeued + X_SCOPED_MUTEX m_unprocCallbacksMutex; ///< Mutex which protects the unprocessed callback queue + + +}; + +}}} // namespace tests::storePerftools::jrnlPerf + +#endif // tests_storePerfTools_jrnlPerf_Journal_h_ diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.cpp b/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.cpp new file mode 100644 index 0000000000..2b07619041 --- /dev/null +++ b/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.cpp @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file JournalParameters.cpp + */ + +#include "JournalParameters.h" + +#include <cstdlib> // std::atof, std::atoi, std::atol + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +#ifndef JOURNAL2 +// static declarations - for jrnl2, these are inherited +std::string JournalParameters::s_defaultJrnlDir = "/tmp/store"; +std::string JournalParameters::s_defaultJrnlBaseFileName = "JournalData"; +uint16_t JournalParameters::s_defaultNumJrnlFiles = 8; +uint32_t JournalParameters::s_defaultJrnlFileSize_sblks = 3072; +uint16_t JournalParameters::s_defaultWriteBuffNumPgs = 32; +uint32_t JournalParameters::s_defaultWriteBuffPgSize_sblks = 128; +#endif + +JournalParameters::JournalParameters() : +#ifdef JOURNAL2 + qpid::asyncStore::jrnl2::JournalParameters() +#else + Parameters(), + m_jrnlDir(s_defaultJrnlDir), + m_jrnlBaseFileName(s_defaultJrnlBaseFileName), + m_numJrnlFiles(s_defaultNumJrnlFiles), + m_jrnlFileSize_sblks(s_defaultJrnlFileSize_sblks), + m_writeBuffNumPgs(s_defaultWriteBuffNumPgs), + m_writeBuffPgSize_sblks(s_defaultWriteBuffPgSize_sblks) +#endif +{} + +JournalParameters::JournalParameters(const std::string& jrnlDir, + const std::string& jrnlBaseFileName, + const uint16_t numJrnlFiles, + const uint32_t jrnlFileSize_sblks, + const uint16_t writeBuffNumPgs, + const uint32_t writeBuffPgSize_sblks) : +#ifdef JOURNAL2 + qpid::asyncStore::jrnl2::JournalParameters(jrnlDir, jrnlBaseFileName, numJrnlFiles, jrnlFileSize_sblks, writeBuffNumPgs, + writeBuffPgSize_sblks) +#else + Parameters(), + m_jrnlDir(jrnlDir), + m_jrnlBaseFileName(jrnlBaseFileName), + m_numJrnlFiles(numJrnlFiles), + m_jrnlFileSize_sblks(jrnlFileSize_sblks), + m_writeBuffNumPgs(writeBuffNumPgs), + m_writeBuffPgSize_sblks(writeBuffPgSize_sblks) +#endif +{} + +#ifdef JOURNAL2 +JournalParameters::JournalParameters(const qpid::asyncStore::jrnl2::JournalParameters& jp) : + qpid::asyncStore::jrnl2::JournalParameters(jp) +{} +#endif + +JournalParameters::JournalParameters(const JournalParameters& jp) : +#ifdef JOURNAL2 + qpid::asyncStore::jrnl2::JournalParameters(jp) +#else + Parameters(), + m_jrnlDir(jp.m_jrnlDir), + m_jrnlBaseFileName(jp.m_jrnlBaseFileName), + m_numJrnlFiles(jp.m_numJrnlFiles), + m_jrnlFileSize_sblks(jp.m_jrnlFileSize_sblks), + m_writeBuffNumPgs(jp.m_writeBuffNumPgs), + m_writeBuffPgSize_sblks(jp.m_writeBuffPgSize_sblks) +#endif +{} + +JournalParameters::~JournalParameters() +{} + +void +JournalParameters::toStream(std::ostream& os) const +{ + os << "Journal Parameters:" << std::endl; + os << " jrnlDir = \"" << m_jrnlDir << "\"" << std::endl; + os << " jrnlBaseFileName = \"" << m_jrnlBaseFileName << "\"" << std::endl; + os << " numJrnlFiles = " << m_numJrnlFiles << std::endl; + os << " jrnlFileSize_sblks = " << m_jrnlFileSize_sblks << std::endl; + os << " writeBuffNumPgs = " << m_writeBuffNumPgs << std::endl; + os << " writeBuffPgSize_sblks = " << m_writeBuffPgSize_sblks << std::endl; +} + +bool +JournalParameters::parseArg(const int arg, + const char* optarg) +{ + switch(arg) { + case 'j': + m_jrnlDir.assign(optarg); + break; + case 'b': + m_jrnlBaseFileName.assign(optarg); + break; + case 'f': + m_numJrnlFiles = uint16_t(std::atoi(optarg)); + break; + case 's': + m_jrnlFileSize_sblks = uint32_t(std::atol(optarg)); + break; + case 'p': + m_writeBuffNumPgs = uint16_t(std::atoi(optarg)); + break; + case 'c': + m_writeBuffPgSize_sblks = uint32_t(std::atol(optarg)); + break; + default: + return false; + } + return true; +} + +// static +void +JournalParameters::printArgs(std::ostream& os) +{ + os << "Journal parameters:" << std::endl; + os << " -j --jrnl_dir: Store directory [\"" + << JournalParameters::s_defaultJrnlDir << "\"]" << std::endl; + os << " -b --jrnl_base_filename: Base name for journal files [\"" + << JournalParameters::s_defaultJrnlBaseFileName << "\"]" << std::endl; + os << " -f --num_jfiles: Number of journal files [" + << JournalParameters::s_defaultNumJrnlFiles << "]" << std::endl; + os << " -s --jfsize_sblks: Size of each journal file in sblks (512 byte blocks) [" + << JournalParameters::s_defaultJrnlFileSize_sblks << "]" << std::endl; + os << " -p --wcache_num_pages: Number of write buffer pages [" + << JournalParameters::s_defaultWriteBuffNumPgs << "]" << std::endl; + os << " -c --wcache_pgsize_sblks: Size of each write buffer page in sblks (512 byte blocks) [" + << JournalParameters::s_defaultWriteBuffPgSize_sblks << "]" << std::endl; +} + +// static +std::string +JournalParameters::shortArgs() +{ + return "j:b:f:s:p:c:"; +} + +}}} // namespace tests::storePerftools::jrnlPerf diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.h b/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.h new file mode 100644 index 0000000000..ab7f864a91 --- /dev/null +++ b/cpp/src/tests/storePerfTools/jrnlPerf/JournalParameters.h @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file JournalParameters.h + */ + +#ifndef tests_storePerfTools_jrnlPerf_JournalParameters_h_ +#define tests_storePerfTools_jrnlPerf_JournalParameters_h_ + +#include "tests/storePerfTools/common/Parameters.h" + +#ifdef JOURNAL2 +# include "qpid/asyncStore/jrnl2/JournalParameters.h" +#endif + +#include <stdint.h> // uint16_6, uint32_t + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +/** + * \brief Stuct for aggregating the common journal parameters + * + * This struct is used to aggregate and keep together all the common journal parameters. These affect the journal + * geometry and buffers. The test parameters are aggregated in class JrnlPerfTestParameters. + */ +class JournalParameters : +#ifdef JOURNAL2 + public qpid::asyncStore::jrnl2::JournalParameters, +#endif + public tests::storePerftools::common::Parameters +{ +public: +#ifndef JOURNAL2 + // static default store params + static std::string s_defaultJrnlDir; ///< Default journal directory + static std::string s_defaultJrnlBaseFileName; ///< Default journal base file name + static uint16_t s_defaultNumJrnlFiles; ///< Default number of journal data files + static uint32_t s_defaultJrnlFileSize_sblks; ///< Default journal data file size in softblocks + static uint16_t s_defaultWriteBuffNumPgs; ///< Default number of write buffer pages + static uint32_t s_defaultWriteBuffPgSize_sblks; ///< Default size of each write buffer page in softblocks + + std::string m_jrnlDir; ///< Journal directory + std::string m_jrnlBaseFileName; ///< Journal base file name + uint16_t m_numJrnlFiles; ///< Number of journal data files + uint32_t m_jrnlFileSize_sblks; ///< Journal data file size in softblocks + uint16_t m_writeBuffNumPgs; ///< Number of write buffer pages + uint32_t m_writeBuffPgSize_sblks; ///< Size of each write buffer page in softblocks +#endif + + /** + * \brief Default constructor + * + * Default constructor. Uses the default values for all parameters. + */ + JournalParameters(); + + /** + * \brief Constructor + * + * Convenience constructor. + * + * \param jrnlDir Journal directory + * \param jrnlBaseFileName Journal base file name + * \param numJrnlFiles Number of journal data files + * \param jrnlFileSize_sblks Journal data file size in softblocks + * \param writeBuffNumPgs Number of write buffer pages + * \param writeBuffPgSize_sblks Size of each write buffer page in softblocks + */ + JournalParameters(const std::string& jrnlDir, + const std::string& jrnlBaseFileName, + const uint16_t numJrnlFiles, + const uint32_t jrnlFileSize_sblks, + const uint16_t writeBuffNumPgs, + const uint32_t writeBuffPgSize_sblks); + + /** + * \brief Copy constructor + * + * \param jp Reference to JrnlParameters instance to be copied + */ +#ifdef JOURNAL2 + JournalParameters(const qpid::asyncStore::jrnl2::JournalParameters& jp); +#endif + JournalParameters(const JournalParameters& jp); + + /** + * \brief Virtual destructor + */ + virtual ~JournalParameters(); + + virtual bool parseArg(const int arg, + const char* optarg); + + static void printArgs(std::ostream& os); + + static std::string shortArgs(); + + /*** + * \brief Stream the journal parameters to an output stream + * + * Convenience feature which streams a multi-line representation of all the journal parameters, one per line to + * an output stream. + * + * \param os Output stream to which the class data is to be streamed + */ + void toStream(std::ostream& os = std::cout) const; + +}; + +}}} // namespace tests::storePerftools::jrnlPerf + +#endif // tests_storePerfTools_jrnlPerf_JournalParameters_h_ diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.cpp b/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.cpp new file mode 100644 index 0000000000..17a9c9b6cb --- /dev/null +++ b/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.cpp @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerfTest.cpp + */ + +#include "PerfTest.h" + +#include "Journal.h" +#include "JournalParameters.h" + +#include "tests/storePerfTools/version.h" +#include "tests/storePerfTools/common/ScopedTimer.h" +#include "tests/storePerfTools/common/TestParameters.h" +#include "tests/storePerfTools/common/Thread.h" + +#ifdef JOURNAL2 +# include "qpid/asyncStore/jrnl2/AsyncJournal.h" +# include "qpid/asyncStore/jrnl2/JournalDirectory.h" +#else +# include "jrnl/jcntl.hpp" +# include "jrnl/jdir.hpp" +#endif + +#include <deque> +#include <getopt.h> // getopt_long(), required_argument, no_argument +#include <iomanip> // std::setw() std::setfill() +#include <sstream> // std::ostringstream +#include <stdint.h> // uint16_t, uint32_t + +#ifdef ECLIPSE_CDT_ANNOYANCE // This prevents problems with Eclipse CODAN, which can't see this in getopt.h + struct option; + extern int getopt_long (int, char *const *, const char *, const struct option *, int *) __THROW; +# define no_argument 0 +# define required_argument 1 +# define optional_argument 2 +#endif + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +PerfTest::PerfTest(const tests::storePerftools::common::TestParameters& tp, + const JournalParameters& jp) : + Streamable(), + m_testParams(tp), + m_jrnlParams(jp), + m_testResult(tp), + m_msgData(new char[tp.m_msgSize]) +{} + +PerfTest::~PerfTest() +{ + delete[] m_msgData; +} + +void +PerfTest::prepareJournals(std::vector<Journal*>& jrnlList) +{ +#ifdef JOURNAL2 + if (qpid::asyncStore::jrnl2::JournalDirectory::s_exists(m_jrnlParams.m_jrnlDir)) { + qpid::asyncStore::jrnl2::JournalDirectory::s_destroy(m_jrnlParams.m_jrnlDir); + } + qpid::asyncStore::jrnl2::JournalDirectory::s_create(m_jrnlParams.m_jrnlDir); + qpid::asyncStore::jrnl2::AsyncJournal* jp; +#else + if (mrg::journal::jdir::exists(m_jrnlParams.m_jrnlDir)) { + mrg::journal::jdir::delete_dir(m_jrnlParams.m_jrnlDir); + } + mrg::journal::jdir::create_dir(m_jrnlParams.m_jrnlDir); + mrg::journal::jcntl* jp; +#endif + Journal* ptp; + for (uint16_t j = 0; j < m_testParams.m_numQueues; j++) { + std::ostringstream jname; + jname << "jrnl_" << std::setw(4) << std::setfill('0') << j; + std::ostringstream jdir; + jdir << m_jrnlParams.m_jrnlDir << "/" << jname.str(); +#ifdef JOURNAL2 + jp = new qpid::asyncStore::jrnl2::AsyncJournal(jname.str(), jdir.str(), m_jrnlParams.m_jrnlBaseFileName); +#else + jp = new mrg::journal::jcntl(jname.str(), jdir.str(), m_jrnlParams.m_jrnlBaseFileName); +#endif + ptp = new Journal(m_testParams.m_numMsgs, m_testParams.m_msgSize, m_msgData, jp); +#ifdef JOURNAL2 + jp->initialize(&m_jrnlParams, ptp); +#else + jp->initialize(m_jrnlParams.m_numJrnlFiles, false, m_jrnlParams.m_numJrnlFiles, + m_jrnlParams.m_jrnlFileSize_sblks, m_jrnlParams.m_writeBuffNumPgs, + m_jrnlParams.m_writeBuffPgSize_sblks, ptp); +#endif + + jrnlList.push_back(ptp); + } +} + +void +PerfTest::destroyJournals(std::vector<Journal*>& jrnlList) +{ + while (jrnlList.size()) { + delete jrnlList.back(); + jrnlList.pop_back(); + } +} + +void +PerfTest::run() +{ + std::vector<Journal*> jrnlList; + prepareJournals(jrnlList); + + std::deque<tests::storePerftools::common::Thread*> threads; + tests::storePerftools::common::Thread* tp; + { // --- Start of timed section --- + tests::storePerftools::common::ScopedTimer st(m_testResult); + + for (uint16_t q = 0; q < m_testParams.m_numQueues; q++) { + for (uint16_t t = 0; t < m_testParams.m_numEnqThreadsPerQueue; t++) { + tp = new tests::storePerftools::common::Thread(jrnlList[q]->startEnqueues, reinterpret_cast<void*>(jrnlList[q])); + threads.push_back(tp); + } + for (uint16_t dt = 0; dt < m_testParams.m_numDeqThreadsPerQueue; ++dt) { + tp = new tests::storePerftools::common::Thread(jrnlList[q]->startDequeues, reinterpret_cast<void*>(jrnlList[q])); + threads.push_back(tp); + } + } + + while (threads.size()) { + threads.front()->join(); + delete threads.front(); + threads.pop_front(); + } + } // --- End of timed section --- + destroyJournals(jrnlList); +} + +void +PerfTest::toStream(std::ostream& os) const +{ + os << m_testParams << std::endl; + os << m_jrnlParams << std::endl; + os << m_testResult << std::endl; +} + +void +printArgs(std::ostream& os) +{ + os << " -h --help: This help message" << std::endl; + os << std::endl; + + tests::storePerftools::common::TestParameters::printArgs(os); + os << std::endl; + + JournalParameters::printArgs(os); + os << std::endl; +} + +bool +readArgs(int argc, + char** argv, + tests::storePerftools::common::TestParameters& tp, + JournalParameters& jp) +{ + /// \todo TODO: At some point, find an easy way to aggregate these from JrnlPerfTestParameters and JrnlParameters themselves. + static struct option long_options[] = { + {"help", no_argument, 0, 'h'}, + {"version", no_argument, 0, 'v'}, + + // Test params + {"num_msgs", required_argument, 0, 'm'}, + {"msg_size", required_argument, 0, 'S'}, + {"num_queues", required_argument, 0, 'q'}, + {"num_enq_threads_per_queue", required_argument, 0, 'e'}, + {"num_deq_threads_per_queue", required_argument, 0, 'd'}, + + // Journal params + {"jrnl_dir", required_argument, 0, 'j'}, + {"jrnl_base_filename", required_argument, 0, 'b'}, + {"num_jfiles", required_argument, 0, 'f'}, + {"jfsize_sblks", required_argument, 0, 's'}, + {"wcache_num_pages", required_argument, 0, 'p'}, + {"wcache_pgsize_sblks", required_argument, 0, 'c'}, + + {0, 0, 0, 0} + }; + + bool err = false; + bool ver = false; + int c = 0; + while (true) { + int option_index = 0; + std::ostringstream oss; + oss << "hv" << tests::storePerftools::common::TestParameters::shortArgs() << JournalParameters::shortArgs(); + c = getopt_long(argc, argv, oss.str().c_str(), long_options, &option_index); + if (c == -1) break; + if (c == 'v') { + std::cout << tests::storePerftools::name() << " v." << tests::storePerftools::version() << std::endl; + ver = true; + break; + } + err = !(tp.parseArg(c, optarg) || jp.parseArg(c, optarg)); + } + if (err) { + std::cout << std::endl; + printArgs(); + } + return err || ver; +} + +}}} // namespace tests::storePerftools::jrnlPerf + +// ----------------------------------------------------------------- + +int +main(int argc, char** argv) +{ + tests::storePerftools::common::TestParameters tp; + tests::storePerftools::jrnlPerf::JournalParameters jp; + if (tests::storePerftools::jrnlPerf::readArgs(argc, argv, tp, jp)) return 1; + tests::storePerftools::jrnlPerf::PerfTest jpt(tp, jp); + jpt.run(); + std::cout << jpt << std::endl; + return 0; +} diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.h b/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.h new file mode 100644 index 0000000000..b105bc0488 --- /dev/null +++ b/cpp/src/tests/storePerfTools/jrnlPerf/PerfTest.h @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerfTest.h + */ + +#ifndef tests_storePerfTools_jrnlPerf_PerfTest_h_ +#define tests_storePerfTools_jrnlPerf_PerfTest_h_ + +#include "TestResult.h" +#include "tests/storePerfTools/common/Streamable.h" + +#include <vector> + +namespace tests { +namespace storePerftools { +namespace common { +class TestParameters; +} +namespace jrnlPerf { + +class Journal; +class JournalParameters; + +/** + * \brief Main test class; Create an instance and execute run() + * + * Main test class which aggregates the components of a test. + */ +class PerfTest : public tests::storePerftools::common::Streamable +{ +public: + /** + * \brief Constructor + * + * \param tp Test parameters for the test + * \param jp Journal parameters for all queues (journals) in the test + */ + PerfTest(const tests::storePerftools::common::TestParameters& tp, + const JournalParameters& jp); + + /** + * \brief Virtual destructor + */ + virtual ~PerfTest(); + + /** + * \brief Runs the test and prints out the results. + * + * Runs the test as set by the test parameters and journal parameters. + */ + void run(); + + /** + * \brief Stream the test setup and results to an output stream + * + * Convenience feature which streams the test setup and results to an output stream. + * + * \param os Output stream to which the test setup and results are to be streamed. + */ + void toStream(std::ostream& os = std::cout) const; + +protected: + const tests::storePerftools::common::TestParameters& m_testParams; ///< Ref to a struct containing test params + const JournalParameters& m_jrnlParams; ///< Ref to a struct containing the journal parameters + TestResult m_testResult; ///< Journal performance object + const char* m_msgData; ///< Pointer to msg data, which is the same for all messages + + /** + * \brief Creates journals and JrnlInstance classes for all journals (queues) to be tested + * + * Creates a new journal instance and JrnlInstance instance for each queue. The journals are initialized + * which creates a new set of journal files on the local storage media (which is determined by path in + * JrnlParameters._jrnlDir). This activity is not timed, and is not a part of the performance test per se. + * + * \param jrnlList List which will be filled with pointers to the newly prepared journals + */ + void prepareJournals(std::vector<Journal*>& jrnlList); + + /** + * \brief Destroy the journal instances in list jrnlList + * + * \param jrnlList List of pointers to journals to be destroyed + */ + void destroyJournals(std::vector<Journal*>& jrnlList); + +}; + +/** + * \brief Print out the program arguments + * + * Print out the arguments to the performance program if requested by help or a parameter error. + * + * \param os Stream to which the arguments should be streamed. + */ +void printArgs(std::ostream& os = std::cout); + +/** + * \brief Process the command-line arguments + * + * Process the command-line arguments and populate the JrnlPerfTestParameters and JrnlParameters structs. Only the + * arguments supplied are on the command-line are changed in these structs, the others remain unchanged. It is + * important therefore to make sure that defaults are pre-loaded (the default behavior of the default constructors + * for these structs). + * + * \param argc Number of command-line arguments. Process directly from main(). + * \param argv Pointer to array of command-line argument pointers. Process directly from main(). + * \param tp Reference to test parameter object. Only params on the command-line are changed. + * \param jp Reference to journal parameter object. Only params on the command-line are changed. + */ +bool readArgs(int argc, + char** argv, + tests::storePerftools::common::TestParameters& tp, + JournalParameters& jp); + +}}} // namespace tests::storePerftools::jrnlPerf + +#endif // tests_storePerfTools_jrnlPerf_PerfTest_h_ diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.cpp b/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.cpp new file mode 100644 index 0000000000..9fe214726d --- /dev/null +++ b/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.cpp @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.cpp + */ + +#include "TestResult.h" + +#include <stdint.h> // uint32_t + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +TestResult::TestResult(const tests::storePerftools::common::TestParameters& tp) : + tests::storePerftools::common::TestResult(), + m_testParams(tp) +{} + +TestResult::~TestResult() +{} + +void +TestResult::toStream(std::ostream& os) const +{ + double msgsRate; + os << "TEST RESULTS:" << std::endl; + os << " Msgs per thread: " << m_testParams.m_numMsgs << std::endl; + os << " Msg size: " << m_testParams.m_msgSize << std::endl; + os << " No. queues: " << m_testParams.m_numQueues << std::endl; + os << " No. enq threads/queue: " << m_testParams.m_numEnqThreadsPerQueue << std::endl; + os << " No. deq threads/queue: " << m_testParams.m_numDeqThreadsPerQueue << std::endl; + os << " Time taken: " << m_elapsed << " sec" << std::endl; + uint32_t msgsPerQueue = m_testParams.m_numMsgs * m_testParams.m_numEnqThreadsPerQueue; + if (m_testParams.m_numQueues > 1) { + msgsRate = double(msgsPerQueue) / m_elapsed; + os << " No. msgs per queue: " << msgsPerQueue << std::endl; + os << "Per queue msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; + os << " " << (msgsRate * m_testParams.m_msgSize / 1e6) << " MB/sec" << std::endl; + } + uint32_t totalMsgs = msgsPerQueue * m_testParams.m_numQueues; + msgsRate = double(totalMsgs) / m_elapsed; + os << " Total no. msgs: " << totalMsgs << std::endl; + os << " Broker msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; + os << " " << (msgsRate * m_testParams.m_msgSize / 1e6) << " MB/sec" << std::endl; +} + +}}} // namespace tests::storePerftools::jrnlPerf diff --git a/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.h b/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.h new file mode 100644 index 0000000000..dae09a6032 --- /dev/null +++ b/cpp/src/tests/storePerfTools/jrnlPerf/TestResult.h @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.h + */ + +#ifndef tests_storePerfTools_jrnlPerf_TestResult_h_ +#define tests_storePerfTools_jrnlPerf_TestResult_h_ + +#include "tests/storePerfTools/common/TestParameters.h" +#include "tests/storePerfTools/common/TestResult.h" + +namespace tests { +namespace storePerftools { +namespace jrnlPerf { + +class TestOptions; + +/** + * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in the journal. + * + * This class (being subclassed from ScopedTimable) is passed to a ScopedTimer object on construction, and the + * inherited _elapsed member will be written with the calculated elapsed time (in seconds) on destruction of the + * ScopedTimer object. This time (initially set to 0.0) is used to calculate message and message byte throughput. + * The message number and size information comes from the JrnlPerfTestParameters object passed to the constructor. + * + * Results are available through the use of toStream(), toString() or the << operators. + * + * Output is in the following format: + * <pre> + * TEST RESULTS: + * Msgs per thread: 10000 + * Msg size: 2048 + * No. queues: 2 + * No. threads/queue: 2 + * Time taken: 1.6626 sec + * Total no. msgs: 40000 + * Msg throughput: 24.0587 kMsgs/sec + * 49.2723 MB/sec + * </pre> + */ +class TestResult : public tests::storePerftools::common::TestResult +{ +public: + /** + * \brief Constructor + * + * Constructor. Will start the time interval measurement. + * + * \param tp Test parameter details used to calculate the performance results. + */ + TestResult(const tests::storePerftools::common::TestParameters& tp); + + /** + * \brief Virtual destructor + */ + virtual ~TestResult(); + + /** + * \brief Stream the performance test results to an output stream + * + * Convenience feature which streams a multi-line performance result an output stream. + * + * \param os Output stream to which the results are to be streamed + */ + void toStream(std::ostream& os = std::cout) const; + +protected: + tests::storePerftools::common::TestParameters m_testParams; ///< Test parameters used for performance calculations + +}; + +}}} // namespace tests::storePerftools::jrnlPerf + +#endif // tests_storePerfTools_jrnlPerf_TestResult_h_ diff --git a/cpp/src/tests/storePerfTools/version.h b/cpp/src/tests/storePerfTools/version.h new file mode 100644 index 0000000000..311b145330 --- /dev/null +++ b/cpp/src/tests/storePerfTools/version.h @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file version.h + */ + +#ifndef tests_storePerftools_version_h_ +#define tests_storePerftools_version_h_ + +#include <iostream> +#include <sstream> + +namespace tests { +namespace storePerftools { + +static const int versionMajor = 0; +static const int versionMinor = 0; +static const int versionRevision = 1; + +std::string name() { + return "Qpid async store perftools"; +} + +std::string version() { + std::ostringstream oss; + oss << versionMajor << "." << versionMinor << "." << versionRevision; + return oss.str(); +} + +}} // namespace tests::perftools + +#endif // tests_storePerftools_version_h_ |