diff options
Diffstat (limited to 'cpp/src/tests/storePerfTools/asyncPerf')
14 files changed, 1770 insertions, 0 deletions
diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp new file mode 100644 index 0000000000..75fc921494 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.cpp @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockPersistableMessage.cpp + */ + +#include "MockPersistableMessage.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +// --- Inner class Queue::MessageContext --- + +MockPersistableMessage::MessageContext::MessageContext(MockPersistableMessagePtr msg, + const qpid::asyncStore::AsyncOperation::opCode op, + MockPersistableQueue* q) : + m_msg(msg), + m_op(op), + m_q(q) +{} + +MockPersistableMessage::MessageContext::~MessageContext() +{} + +const char* +MockPersistableMessage::MessageContext::getOp() const +{ + return qpid::asyncStore::AsyncOperation::getOpStr(m_op); +} + +void +MockPersistableMessage::MessageContext::destroy() +{ + delete this; +} + +// --- Class MockPersistableMessage --- + + +MockPersistableMessage::MockPersistableMessage(const char* msgData, + const uint32_t msgSize, + AsyncStoreImplPtr store, + const bool persistent) : + m_persistenceId(0ULL), + m_msg(msgData, static_cast<size_t>(msgSize)), + m_persistent(persistent), + m_msgHandle(store->createMessageHandle(this)) +{} + +MockPersistableMessage::~MockPersistableMessage() +{} + +// static +void +MockPersistableMessage::handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc) +{ + if (bc) { + MessageContext* mc = dynamic_cast<MessageContext*>(bc); + if (mc->m_msg) { + if (res->errNo) { + // TODO: Handle async failure here + std::cerr << "Message pid=0x" << std::hex << mc->m_msg->m_persistenceId << std::dec << ": Operation " + << mc->getOp() << ": failure " << res->errNo << " (" << res->errMsg << ")" << std::endl; + } else { + // Handle async success here + switch(mc->m_op) { + case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: + mc->m_msg->dequeueComplete(mc); + break; + case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: + mc->m_msg->enqueueComplete(mc); + break; + default: + std::ostringstream oss; + oss << "tests::storePerftools::asyncPerf::MockPersistableMessage::handleAsyncResult(): Unknown async queue operation: " << mc->m_op; + throw qpid::Exception(oss.str()); + }; + } + } + } + if (bc) delete bc; + if (res) delete res; +} + +qpid::broker::MessageHandle& +MockPersistableMessage::getHandle() +{ + return m_msgHandle; +} + +void +MockPersistableMessage::setPersistenceId(uint64_t id) const +{ + m_persistenceId = id; +} + +uint64_t +MockPersistableMessage::getPersistenceId() const +{ + return m_persistenceId; +} + +void +MockPersistableMessage::encode(qpid::framing::Buffer& buffer) const +{ + buffer.putRawData(m_msg); +} + +uint32_t +MockPersistableMessage::encodedSize() const +{ + return static_cast<uint32_t>(m_msg.size()); +} + +void +MockPersistableMessage::allDequeuesComplete() +{} + +uint32_t +MockPersistableMessage::encodedHeaderSize() const +{ + return 0; +} + +bool +MockPersistableMessage::isPersistent() const +{ + return m_persistent; +} + +uint64_t +MockPersistableMessage::getSize() +{ + return m_msg.size(); +} + +void +MockPersistableMessage::write(char* target) +{ + ::memcpy(target, m_msg.data(), m_msg.size()); +} + +// protected +void +MockPersistableMessage::enqueueComplete(const MessageContext* /*mc*/) +{ +//std::cout << "~~~~~ Message pid=0x" << std::hex << m_persistenceId << std::dec << ": enqueueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl; +} + +// protected +void +MockPersistableMessage::dequeueComplete(const MessageContext* /*mc*/) +{ +//std::cout << "~~~~~ Message pid=0x" << std::hex << m_persistenceId << std::dec << ": dequeueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h new file mode 100644 index 0000000000..a139328690 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableMessage.h @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockPersistableMessage.h + */ + +#ifndef tests_storePerfTools_asyncPerf_MockPersistableMessage_h_ +#define tests_storePerfTools_asyncPerf_MockPersistableMessage_h_ + +#include "qpid/asyncStore/AsyncOperation.h" +#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource +#include "qpid/broker/BrokerContext.h" +#include "qpid/broker/MessageHandle.h" +#include "qpid/broker/PersistableMessage.h" + +#include <stdint.h> // uint32_t + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class MockPersistableMessage; +class MockPersistableQueue; + +typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr; +typedef boost::shared_ptr<MockPersistableMessage> MockPersistableMessagePtr; + +class MockPersistableMessage: public qpid::broker::PersistableMessage, qpid::broker::DataSource +{ +public: + class MessageContext : public qpid::broker::BrokerContext + { + public: + MessageContext(MockPersistableMessagePtr msg, + const qpid::asyncStore::AsyncOperation::opCode op, + MockPersistableQueue* q); + virtual ~MessageContext(); + const char* getOp() const; + void destroy(); + MockPersistableMessagePtr m_msg; + const qpid::asyncStore::AsyncOperation::opCode m_op; + MockPersistableQueue* m_q; + }; + + MockPersistableMessage(const char* msgData, + const uint32_t msgSize, + AsyncStoreImplPtr store, + const bool persistent = true); + virtual ~MockPersistableMessage(); + static void handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc); + qpid::broker::MessageHandle& getHandle(); + + // Interface Persistable + virtual void setPersistenceId(uint64_t id) const; + virtual uint64_t getPersistenceId() const; + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + + // Interface PersistableMessage + virtual void allDequeuesComplete(); + virtual uint32_t encodedHeaderSize() const; + virtual bool isPersistent() const; + + // Interface DataStore + virtual uint64_t getSize(); + virtual void write(char* target); + +protected: + mutable uint64_t m_persistenceId; + const std::string m_msg; + const bool m_persistent; + qpid::broker::MessageHandle m_msgHandle; + + // --- Ascnc op completions (called through handleAsyncResult) --- + void enqueueComplete(const MessageContext* mc); + void dequeueComplete(const MessageContext* mc); + +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerfTools_asyncPerf_MockPersistableMessage_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp new file mode 100644 index 0000000000..ff9b76c421 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.cpp @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockPersistableQueue.cpp + */ + +#include "MockPersistableQueue.h" + +#include "MockPersistableMessage.h" +#include "MockTransactionContext.h" +#include "QueuedMessage.h" +#include "TestOptions.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/EnqueueHandle.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +// --- Inner class MockPersistableQueue::QueueContext --- + +MockPersistableQueue::QueueContext::QueueContext(MockPersistableQueue* q, + const qpid::asyncStore::AsyncOperation::opCode op) : + m_q(q), + m_op(op) +{} + +MockPersistableQueue::QueueContext::~QueueContext() +{} + +const char* +MockPersistableQueue::QueueContext::getOp() const +{ + return qpid::asyncStore::AsyncOperation::getOpStr(m_op); +} + +void +MockPersistableQueue::QueueContext::destroy() +{ + delete this; +} + +// --- Class MockPersistableQueue --- + +MockPersistableQueue::MockPersistableQueue(const std::string& name, + const qpid::framing::FieldTable& /*args*/, + AsyncStoreImplPtr store, + const TestOptions& to, + const char* msgData) : + m_name(name), + m_store(store), + m_persistenceId(0ULL), + m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. + m_perfTestOpts(to), + m_msgData(msgData) +{ + const qpid::types::Variant::Map qo; + m_queueHandle = m_store->createQueueHandle(m_name, qo); + qpid::broker::BrokerContext* bc = new QueueContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE); + m_store->submitCreate(m_queueHandle, + dynamic_cast<const qpid::broker::DataSource*>(this), + &handleAsyncResult, + bc); +} + +MockPersistableQueue::~MockPersistableQueue() +{ +// m_store->flush(*this); + // TODO: Make destroying the store a test parameter +// m_store->destroy(*this); +// m_store = 0; +} + +// static +void +MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc) +{ + if (bc && res) { + QueueContext* qc = dynamic_cast<QueueContext*>(bc); + if (qc->m_q) { + if (res->errNo) { + // TODO: Handle async failure here + std::cerr << "Queue name=\"" << qc->m_q->m_name << "\": Operation " << qc->getOp() << ": failure " + << res->errNo << " (" << res->errMsg << ")" << std::endl; + } else { + // Handle async success here + switch(qc->m_op) { + case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: + qc->m_q->createComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::QUEUE_FLUSH: + qc->m_q->flushComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: + qc->m_q->destroyComplete(qc); + break; + default: + std::ostringstream oss; + oss << "tests::storePerftools::asyncPerf::MockPersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->m_op; + throw qpid::Exception(oss.str()); + }; + } + } + } + if (bc) delete bc; + if (res) delete res; +} + +qpid::broker::QueueHandle& +MockPersistableQueue::getHandle() +{ + return m_queueHandle; +} + +void* +MockPersistableQueue::runEnqueues() +{ + uint32_t numMsgs = 0; + uint16_t txnCnt = 0; + const bool useTxn = m_perfTestOpts.m_enqTxnBlockSize > 0; + MockTransactionContextPtr txn; + while (numMsgs < m_perfTestOpts.m_numMsgs) { + if (useTxn && txnCnt == 0) { + txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() + } + MockPersistableMessagePtr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true)); + msg->setPersistenceId(m_store->getNextRid()); + qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle); + MockPersistableMessage::MessageContext* msgCtxt = new MockPersistableMessage::MessageContext(msg, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, this); + if (useTxn) { + m_store->submitEnqueue(enqHandle, + txn->getHandle(), + &MockPersistableMessage::handleAsyncResult, + dynamic_cast<qpid::broker::BrokerContext*>(msgCtxt)); + } else { + m_store->submitEnqueue(enqHandle, + &MockPersistableMessage::handleAsyncResult, + dynamic_cast<qpid::broker::BrokerContext*>(msgCtxt)); + } + QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn)); + push(qm); +//std::cout << "**** push 0x" << std::hex << msg->getPersistenceId() << std::dec << std::endl; + if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) { + txn->commit(); + txnCnt = 0; + } + ++numMsgs; + } + if (txnCnt > 0) { + txn->commit(); + txnCnt = 0; + } + return 0; +} + +void* +MockPersistableQueue::runDequeues() +{ + uint32_t numMsgs = 0; + const uint32_t numMsgsToDequeue = m_perfTestOpts.m_numMsgs * m_perfTestOpts.m_numEnqThreadsPerQueue / m_perfTestOpts.m_numDeqThreadsPerQueue; + uint16_t txnCnt = 0; + const bool useTxn = m_perfTestOpts.m_deqTxnBlockSize > 0; + MockTransactionContextPtr txn; + QueuedMessagePtr qm; + while (numMsgs < numMsgsToDequeue) { + if (useTxn && txnCnt == 0) { + txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() + } + pop(qm); + if (qm.get()) { + qpid::broker::EnqueueHandle enqHandle = qm->getEnqueueHandle(); + qpid::broker::BrokerContext* bc = new MockPersistableMessage::MessageContext(qm->getMessage(), + qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, + this); + if (useTxn) { + m_store->submitDequeue(enqHandle, + txn->getHandle(), + &MockPersistableMessage::handleAsyncResult, + bc); + } else { + m_store->submitDequeue(enqHandle, + &MockPersistableMessage::handleAsyncResult, + bc); + } + ++numMsgs; + qm.reset(static_cast<QueuedMessage*>(0)); + if (useTxn && ++txnCnt >= m_perfTestOpts.m_deqTxnBlockSize) { + txn->commit(); + txnCnt = 0; + } + } else { +// ::usleep(100); // 0.1 ms TODO: Use a condition variable instead of sleeping/spinning + } + } + if (txnCnt > 0) { + txn->commit(); + txnCnt = 0; + } + return 0; +} + +//static +void* +MockPersistableQueue::startEnqueues(void* ptr) +{ + return reinterpret_cast<MockPersistableQueue*>(ptr)->runEnqueues(); +} + +//static +void* +MockPersistableQueue::startDequeues(void* ptr) +{ + return reinterpret_cast<MockPersistableQueue*>(ptr)->runDequeues(); +} + +void +MockPersistableQueue::encode(qpid::framing::Buffer& buffer) const +{ + buffer.putShortString(m_name); +} + +uint32_t +MockPersistableQueue::encodedSize() const +{ + return m_name.size() + 1; +} + +uint64_t +MockPersistableQueue::getPersistenceId() const +{ + return m_persistenceId; +} + +void +MockPersistableQueue::setPersistenceId(uint64_t persistenceId) const +{ + m_persistenceId = persistenceId; +} + +void +MockPersistableQueue::flush() +{ + //if(m_store) m_store->flush(*this); +} + +const std::string& +MockPersistableQueue::getName() const +{ + return m_name; +} + +void +MockPersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) +{ + if (externalQueueStore != inst && externalQueueStore) + delete externalQueueStore; + externalQueueStore = inst; +} + +uint64_t +MockPersistableQueue::getSize() +{ + return m_persistableData.size(); +} + +void +MockPersistableQueue::write(char* target) +{ + ::memcpy(target, m_persistableData.data(), m_persistableData.size()); +} + +// protected +void +MockPersistableQueue::createComplete(const QueueContext* /*qc*/) +{ +//std::cout << "~~~~~ Queue name=\"" << m_name << "\": createComplete()" << std::endl; +} + +// protected +void +MockPersistableQueue::flushComplete(const QueueContext* /*qc*/) +{ +//std::cout << "~~~~~ Queue name=\"" << m_name << "\": flushComplete()" << std::endl; +} + +// protected +void +MockPersistableQueue::destroyComplete(const QueueContext* /*qc*/) +{ +//std::cout << "~~~~~ Queue name=\"" << m_name << "\": destroyComplete()" << std::endl; +} + +// protected +void +MockPersistableQueue::push(QueuedMessagePtr& qm) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); + m_enqueuedMsgs.push_back(qm); + m_dequeueCondition.notify(); +} + +// protected +void +MockPersistableQueue::pop(QueuedMessagePtr& qm) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); + if (m_enqueuedMsgs.empty()) { + m_dequeueCondition.wait(m_enqueuedMsgsMutex); + } + qm = m_enqueuedMsgs.front(); + if (qm->isTransactional()) { + // The next msg is still in an open transaction, skip and find next non-open-txn msg + MsgEnqListItr i = m_enqueuedMsgs.begin(); + while (++i != m_enqueuedMsgs.end()) { + if (!(*i)->isTransactional()) { + qm = *i; + m_enqueuedMsgs.erase(i); + } + } + } else { + // The next msg is not in an open txn + m_enqueuedMsgs.pop_front(); + } +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h new file mode 100644 index 0000000000..a77f41743c --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockPersistableQueue.h + */ + +#ifndef tests_storePerfTools_asyncPerf_MockPersistableQueue_h_ +#define tests_storePerfTools_asyncPerf_MockPersistableQueue_h_ + +#include "qpid/asyncStore/AsyncOperation.h" +#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource +#include "qpid/broker/BrokerContext.h" +#include "qpid/broker/PersistableQueue.h" +#include "qpid/broker/QueueHandle.h" +#include "qpid/sys/Condition.h" +#include "qpid/sys/Mutex.h" + +#include <boost/shared_ptr.hpp> +#include <deque> + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +} +namespace framing { +class FieldTable; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class QueuedMessage; +class TestOptions; + +typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr; +typedef boost::shared_ptr<QueuedMessage> QueuedMessagePtr; + +class MockPersistableQueue : public qpid::broker::PersistableQueue, qpid::broker::DataSource +{ +public: + class QueueContext : public qpid::broker::BrokerContext + { + public: + QueueContext(MockPersistableQueue* q, + const qpid::asyncStore::AsyncOperation::opCode op); + virtual ~QueueContext(); + const char* getOp() const; + void destroy(); + MockPersistableQueue* m_q; + const qpid::asyncStore::AsyncOperation::opCode m_op; + }; + + MockPersistableQueue(const std::string& name, + const qpid::framing::FieldTable& args, + AsyncStoreImplPtr store, + const TestOptions& perfTestParams, + const char* msgData); + virtual ~MockPersistableQueue(); + + // --- Async functionality --- + static void handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc); + qpid::broker::QueueHandle& getHandle(); + + // --- Performance test thread entry points --- + void* runEnqueues(); + void* runDequeues(); + static void* startEnqueues(void* ptr); + static void* startDequeues(void* ptr); + + // --- Interface qpid::broker::Persistable --- + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + virtual uint64_t getPersistenceId() const; + virtual void setPersistenceId(uint64_t persistenceId) const; + + // --- Interface qpid::broker::PersistableQueue --- + virtual void flush(); + virtual const std::string& getName() const; + virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst); + + // --- Interface DataStore --- + virtual uint64_t getSize(); + virtual void write(char* target); + +protected: + const std::string m_name; + AsyncStoreImplPtr m_store; + mutable uint64_t m_persistenceId; + std::string m_persistableData; + qpid::broker::QueueHandle m_queueHandle; + + // Test params + const TestOptions& m_perfTestOpts; + const char* m_msgData; + + typedef std::deque<QueuedMessagePtr> MsgEnqList; + typedef MsgEnqList::iterator MsgEnqListItr; + MsgEnqList m_enqueuedMsgs; + qpid::sys::Mutex m_enqueuedMsgsMutex; + qpid::sys::Condition m_dequeueCondition; + + // --- Ascnc op completions (called through handleAsyncResult) --- + void createComplete(const QueueContext* qc); + void flushComplete(const QueueContext* qc); + void destroyComplete(const QueueContext* qc); + + // --- Queue functionality --- + void push(QueuedMessagePtr& msg); + void pop(QueuedMessagePtr& msg); +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerfTools_asyncPerf_MockPersistableQueue_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp new file mode 100644 index 0000000000..e564876daa --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.cpp @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockTransactionContext.cpp + */ + +#include "MockTransactionContext.h" + +#include "QueuedMessage.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +// --- Inner class MockTransactionContext::QueueContext --- + +MockTransactionContext::TransactionContext::TransactionContext(MockTransactionContext* tc, + const qpid::asyncStore::AsyncOperation::opCode op) : + m_tc(tc), + m_op(op) +{} + +MockTransactionContext::TransactionContext::~TransactionContext() +{} + +const char* +MockTransactionContext::TransactionContext::getOp() const +{ + return qpid::asyncStore::AsyncOperation::getOpStr(m_op); +} + +void +MockTransactionContext::TransactionContext::destroy() +{ + delete this; +} + +// --- Class MockTransactionContext --- + + +MockTransactionContext::MockTransactionContext(AsyncStoreImplPtr store, + const std::string& xid) : + m_store(store), + m_txnHandle(store->createTxnHandle(xid)), + m_prepared(false), + m_enqueuedMsgs() +{ +//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; +} + +MockTransactionContext::~MockTransactionContext() +{} + +// static +void +MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc) +{ + if (bc && res) { + TransactionContext* tc = dynamic_cast<TransactionContext*>(bc); + if (tc->m_tc) { + if (res->errNo) { + // TODO: Handle async failure here + std::cerr << "Transaction xid=\"" << tc->m_tc->getXid() << "\": Operation " << tc->getOp() << ": failure " + << res->errNo << " (" << res->errMsg << ")" << std::endl; + } else { + // Handle async success here + switch(tc->m_op) { + case qpid::asyncStore::AsyncOperation::TXN_PREPARE: + tc->m_tc->prepareComplete(tc); + break; + case qpid::asyncStore::AsyncOperation::TXN_COMMIT: + tc->m_tc->commitComplete(tc); + break; + case qpid::asyncStore::AsyncOperation::TXN_ABORT: + tc->m_tc->abortComplete(tc); + break; + default: + std::ostringstream oss; + oss << "tests::storePerftools::asyncPerf::MockTransactionContext::handleAsyncResult(): Unknown async operation: " << tc->m_op; + throw qpid::Exception(oss.str()); + }; + } + } + } + if (bc) delete bc; + if (res) delete res; +} + +qpid::broker::TxnHandle& +MockTransactionContext::getHandle() +{ + return m_txnHandle; +} + +bool +MockTransactionContext::is2pc() const +{ + return m_txnHandle.is2pc(); +} + +const std::string& +MockTransactionContext::getXid() const +{ + return m_txnHandle.getXid(); +} + +void +MockTransactionContext::addEnqueuedMsg(QueuedMessage* qm) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); + m_enqueuedMsgs.push_back(qm); +} + +void +MockTransactionContext::prepare() +{ + if (m_txnHandle.is2pc()) { + localPrepare(); + m_prepared = true; + } + std::ostringstream oss; + oss << "MockTransactionContext::prepare(): xid=\"" << getXid() + << "\": Transaction Error: called prepare() on local transaction"; + throw qpid::Exception(oss.str()); +} + +void +MockTransactionContext::abort() +{ + // TODO: Check the following XA transaction semantics: + // Assuming 2PC aborts can occur without a prepare. Do local prepare if not already prepared. + if (!m_prepared) { + localPrepare(); + } + m_store->submitAbort(m_txnHandle, + &handleAsyncResult, + dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT))); +//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + +void +MockTransactionContext::commit() +{ + if (is2pc()) { + if (!m_prepared) { + std::ostringstream oss; + oss << "MockTransactionContext::abort(): xid=\"" << getXid() + << "\": Transaction Error: called commit() without prepare() on 2PC transaction"; + throw qpid::Exception(oss.str()); + } + } else { + localPrepare(); + } + m_store->submitCommit(m_txnHandle, + &handleAsyncResult, + dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT))); +//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + + +// protected +void +MockTransactionContext::localPrepare() +{ + m_store->submitPrepare(m_txnHandle, + &handleAsyncResult, + dynamic_cast<qpid::broker::BrokerContext*>(new TransactionContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE))); +//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; +} + +// protected +void +MockTransactionContext::prepareComplete(const TransactionContext* /*tc*/) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); + while (!m_enqueuedMsgs.empty()) { + m_enqueuedMsgs.front()->clearTransaction(); + m_enqueuedMsgs.pop_front(); + } +//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": prepareComplete()" << std::endl; +} + + +// protected +void +MockTransactionContext::abortComplete(const TransactionContext* /*tc*/) +{ +//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": abortComplete()" << std::endl; +} + + +// protected +void +MockTransactionContext::commitComplete(const TransactionContext* /*tc*/) +{ +//std::cout << "~~~~~ Transaction xid=\"" << getXid() << "\": commitComplete()" << std::endl; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h new file mode 100644 index 0000000000..35de7374c5 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/MockTransactionContext.h @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MockTransactionContext.h + */ + +#ifndef tests_storePerfTools_asyncPerf_MockTransactionContext_h_ +#define tests_storePerfTools_asyncPerf_MockTransactionContext_h_ + +#include "qpid/asyncStore/AsyncOperation.h" + +#include "qpid/broker/BrokerContext.h" +#include "qpid/broker/TransactionalStore.h" // qpid::broker::TransactionContext +#include "qpid/broker/TxnHandle.h" +#include "qpid/sys/Mutex.h" + +#include <boost/shared_ptr.hpp> +#include <deque> + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class QueuedMessage; + +typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr; + +class MockTransactionContext : public qpid::broker::TransactionContext +{ +public: + // NOTE: TransactionContext - Bad naming? This context is the async return handling context for class + // MockTransactionContext async ops. Other classes using this pattern simply use XXXContext for this class + // (e.g. QueueContext in MockPersistableQueue), but in this case it may be confusing. + class TransactionContext : public qpid::broker::BrokerContext + { + public: + TransactionContext(MockTransactionContext* tc, + const qpid::asyncStore::AsyncOperation::opCode op); + virtual ~TransactionContext(); + const char* getOp() const; + void destroy(); + MockTransactionContext* m_tc; + const qpid::asyncStore::AsyncOperation::opCode m_op; + }; + + MockTransactionContext(AsyncStoreImplPtr store, + const std::string& xid = std::string()); + virtual ~MockTransactionContext(); + static void handleAsyncResult(const qpid::broker::AsyncResult* res, + qpid::broker::BrokerContext* bc); + + qpid::broker::TxnHandle& getHandle(); + bool is2pc() const; + const std::string& getXid() const; + void addEnqueuedMsg(QueuedMessage* qm); + + void prepare(); + void abort(); + void commit(); + +protected: + AsyncStoreImplPtr m_store; + qpid::broker::TxnHandle m_txnHandle; + bool m_prepared; + std::deque<QueuedMessage*> m_enqueuedMsgs; + qpid::sys::Mutex m_enqueuedMsgsMutex; + + void localPrepare(); + + // --- Ascnc op completions (called through handleAsyncResult) --- + void prepareComplete(const TransactionContext* tc); + void abortComplete(const TransactionContext* tc); + void commitComplete(const TransactionContext* tc); + +}; + +}}} // namespace tests:storePerftools::asyncPerf + +#endif // tests_storePerfTools_asyncPerf_MockTransactionContext_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp new file mode 100644 index 0000000000..fe66604774 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.cpp @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerfTest.cpp + */ + +#include "PerfTest.h" + +#include "MockPersistableQueue.h" + +#include "tests/storePerfTools/version.h" +#include "tests/storePerfTools/common/ScopedTimer.h" +#include "tests/storePerfTools/common/Thread.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" + +#include <iomanip> + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +PerfTest::PerfTest(const TestOptions& to, + const qpid::asyncStore::AsyncStoreOptions& aso) : + m_testOpts(to), + m_storeOpts(aso), + m_testResult(to), + m_msgData(new char[to.m_msgSize]), + m_poller(new qpid::sys::Poller), + m_pollingThread(m_poller.get()) +{ + std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize); +} + +PerfTest::~PerfTest() +{ + m_poller->shutdown(); + m_pollingThread.join(); + delete[] m_msgData; +} + +AsyncStoreImplPtr +PerfTest::prepareStore() +{ + AsyncStoreImplPtr store(new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts)); + store->initialize(); + return store; +} + +void +PerfTest::prepareQueues(std::deque<MockPersistableQueuePtr>& jrnlList, + AsyncStoreImplPtr store) +{ + for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { + std::ostringstream qname; + qname << "queue_" << std::setw(4) << std::setfill('0') << i; + MockPersistableQueuePtr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, store, m_testOpts, m_msgData)); + jrnlList.push_back(mpq); + } +} + +void +PerfTest::destroyQueues(std::deque<MockPersistableQueuePtr>& jrnlList) +{ + jrnlList.clear(); +} + +void +PerfTest::run() +{ + typedef boost::shared_ptr<tests::storePerftools::common::Thread> ThreadPtr; // TODO - replace with qpid threads + + AsyncStoreImplPtr store = prepareStore(); + + std::deque<MockPersistableQueuePtr> queueList; + prepareQueues(queueList, store); + + std::deque<ThreadPtr> threads; + { // --- Start of timed section --- + tests::storePerftools::common::ScopedTimer st(m_testResult); + + for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) { + for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads + ThreadPtr tp(new tests::storePerftools::common::Thread(queueList[q]->startEnqueues, + reinterpret_cast<void*>(queueList[q].get()))); + threads.push_back(tp); + } + for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads + ThreadPtr tp(new tests::storePerftools::common::Thread(queueList[q]->startDequeues, + reinterpret_cast<void*>(queueList[q].get()))); + threads.push_back(tp); + } + } + while (threads.size()) { + threads.front()->join(); + threads.pop_front(); + } + } // --- End of timed section --- + + destroyQueues(queueList); +// DEBUG MEASURE - REMOVE WHEN FIXED +//::sleep(2); +} + +void +PerfTest::toStream(std::ostream& os) const +{ + m_testOpts.printVals(os); + os << std::endl; + m_storeOpts.printVals(os); + os << std::endl; + os << m_testResult << std::endl; +} + +}}} // namespace tests::storePerftools::asyncPerf + +// ----------------------------------------------------------------- + +int +main(int argc, char** argv) +{ + qpid::CommonOptions co; + qpid::asyncStore::AsyncStoreOptions aso; + tests::storePerftools::asyncPerf::TestOptions to; + qpid::Options opts; + opts.add(co).add(aso).add(to); + try { + opts.parse(argc, argv); + aso.validate(); + to.validate(); + } + catch (std::exception& e) { + std::cerr << e.what() << std::endl; + return 1; + } + + // Handle options that just print information then exit. + if (co.version) { + std::cout << tests::storePerftools::name() << " v." << tests::storePerftools::version() << std::endl; + return 0; + } + if (co.help) { + std::cout << tests::storePerftools::name() << ": asyncPerf" << std::endl; + std::cout << "Performance test for the async store through the qpid async store interface." << std::endl; + std::cout << "Usage: asyncPerf [options]" << std::endl; + std::cout << opts << std::endl; + return 0; + } + + // Create and start test + tests::storePerftools::asyncPerf::PerfTest apt(to, aso); + apt.run(); + + // Print test result + std::cout << apt << std::endl; + ::sleep(1); + return 0; +} diff --git a/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h new file mode 100644 index 0000000000..1eb11a51fa --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/PerfTest.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file PerfTest.h + */ + +#ifndef tests_storePerfTools_asyncPerf_PerfTest_h_ +#define tests_storePerfTools_asyncPerf_PerfTest_h_ + +#include "TestResult.h" + +#include "tests/storePerfTools/common/Streamable.h" + +#include "qpid/framing/FieldTable.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Thread.h" + +#include <boost/shared_ptr.hpp> +#include <deque> + +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +class AsyncStoreOptions; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class MockPersistableQueue; +class TestOptions; + +typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr; +typedef boost::shared_ptr<MockPersistableQueue> MockPersistableQueuePtr; + +class PerfTest : public tests::storePerftools::common::Streamable +{ +public: + PerfTest(const TestOptions& to, + const qpid::asyncStore::AsyncStoreOptions& aso); + virtual ~PerfTest(); + void run(); + void toStream(std::ostream& os = std::cout) const; + +protected: + const TestOptions& m_testOpts; + const qpid::asyncStore::AsyncStoreOptions& m_storeOpts; + TestResult m_testResult; + qpid::framing::FieldTable m_queueArgs; + const char* m_msgData; + boost::shared_ptr<qpid::sys::Poller> m_poller; + qpid::sys::Thread m_pollingThread; + + AsyncStoreImplPtr prepareStore(); + void prepareQueues(std::deque<MockPersistableQueuePtr>& jrnlList, + AsyncStoreImplPtr store); + void destroyQueues(std::deque<MockPersistableQueuePtr>& jrnlList); + +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerfTools_asyncPerf_PerfTest_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp new file mode 100644 index 0000000000..315e202d8b --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.cpp @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file QueuedMessage.cpp + */ + +#include "QueuedMessage.h" + +#include "MockTransactionContext.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +QueuedMessage::QueuedMessage(MockPersistableMessagePtr msg, + qpid::broker::EnqueueHandle& enqHandle, + MockTransactionContextPtr txn) : + m_msg(msg), + m_enqHandle(enqHandle), + m_txn(txn) +{ + if (txn) { + txn->addEnqueuedMsg(this); + } +} + +QueuedMessage::~QueuedMessage() +{} + +MockPersistableMessagePtr +QueuedMessage::getMessage() const +{ + return m_msg; +} + +qpid::broker::EnqueueHandle +QueuedMessage::getEnqueueHandle() const +{ + return m_enqHandle; +} + +MockTransactionContextPtr +QueuedMessage::getTransactionContext() const +{ + return m_txn; +} + +bool +QueuedMessage::isTransactional() const +{ + return m_txn.get() != 0; +} + +void +QueuedMessage::clearTransaction() +{ + m_txn.reset(static_cast<MockTransactionContext*>(0)); +} + +}}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h new file mode 100644 index 0000000000..4b3dab67f9 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/QueuedMessage.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file QueuedMessage.h + */ + +#ifndef tests_storePerfTools_asyncPerf_QueuedMessage_h_ +#define tests_storePerfTools_asyncPerf_QueuedMessage_h_ + +#include "qpid/broker/EnqueueHandle.h" +#include <boost/shared_ptr.hpp> + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class MockPersistableMessage; +class MockTransactionContext; + +typedef boost::shared_ptr<MockPersistableMessage> MockPersistableMessagePtr; +typedef boost::shared_ptr<MockTransactionContext> MockTransactionContextPtr; + +class QueuedMessage +{ +public: + QueuedMessage(MockPersistableMessagePtr msg, + qpid::broker::EnqueueHandle& enqHandle, + MockTransactionContextPtr txn); + virtual ~QueuedMessage(); + MockPersistableMessagePtr getMessage() const; + qpid::broker::EnqueueHandle getEnqueueHandle() const; + MockTransactionContextPtr getTransactionContext() const; + bool isTransactional() const; + void clearTransaction(); + +protected: + MockPersistableMessagePtr m_msg; + qpid::broker::EnqueueHandle m_enqHandle; + MockTransactionContextPtr m_txn; +}; + +}}} // namespace tests::storePerfTools + +#endif // tests_storePerfTools_asyncPerf_QueuedMessage_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp new file mode 100644 index 0000000000..27784ef661 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.cpp @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestOptions.cpp + */ + +#include "TestOptions.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +// static declarations +uint16_t TestOptions::s_defaultEnqTxnBlkSize = 0; +uint16_t TestOptions::s_defaultDeqTxnBlkSize = 0; + +TestOptions::TestOptions(const std::string& name) : + tests::storePerftools::common::TestOptions(name), + m_enqTxnBlockSize(s_defaultEnqTxnBlkSize), + m_deqTxnBlockSize(s_defaultDeqTxnBlkSize) +{ + doAddOptions(); +} + +TestOptions::TestOptions(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue, + const uint16_t enqTxnBlockSize, + const uint16_t deqTxnBlockSize, + const std::string& name) : + tests::storePerftools::common::TestOptions(numMsgs, msgSize, numQueues, numEnqThreadsPerQueue, numDeqThreadsPerQueue, name), + m_enqTxnBlockSize(enqTxnBlockSize), + m_deqTxnBlockSize(deqTxnBlockSize) +{ + doAddOptions(); +} + +TestOptions::~TestOptions() +{} + +void +TestOptions::printVals(std::ostream& os) const +{ + tests::storePerftools::common::TestOptions::printVals(os); + os << " Num enqueus per transaction [-t, --enq-txn-size]: " << m_enqTxnBlockSize << std::endl; + os << " Num dequeues per transaction [-d, --deq-txn-size]: " << m_deqTxnBlockSize << std::endl; +} + +void +TestOptions::doAddOptions() +{ + addOptions() + ("enq-txn-size,t", qpid::optValue(m_enqTxnBlockSize, "N"), + "Num enqueus per transaction (0 = no transactions)") + ("deq-txn-size,d", qpid::optValue(m_deqTxnBlockSize, "N"), + "Num dequeues per transaction (0 = no transactions)") + ; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h new file mode 100644 index 0000000000..b0e1e4ce74 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/TestOptions.h @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestOptions.h + */ + +#ifndef tests_storePerfTools_asyncPerf_TestOptions_h_ +#define tests_storePerfTools_asyncPerf_TestOptions_h_ + +#include "tests/storePerfTools/common/TestOptions.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class TestOptions : public tests::storePerftools::common::TestOptions +{ +public: + TestOptions(const std::string& name="Test Options"); + TestOptions(const uint32_t numMsgs, + const uint32_t msgSize, + const uint16_t numQueues, + const uint16_t numEnqThreadsPerQueue, + const uint16_t numDeqThreadsPerQueue, + const uint16_t enqTxnBlockSize, + const uint16_t deqTxnBlockSize, + const std::string& name="Test Options"); + virtual ~TestOptions(); + void printVals(std::ostream& os) const; + + uint16_t m_enqTxnBlockSize; ///< Transaction block size for enqueues + uint16_t m_deqTxnBlockSize; ///< Transaction block size for dequeues + +protected: + static uint16_t s_defaultEnqTxnBlkSize; ///< Default transaction block size for enqueues + static uint16_t s_defaultDeqTxnBlkSize; ///< Default transaction block size for dequeues + + void doAddOptions(); +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerfTools_asyncPerf_TestOptions_h_ diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp new file mode 100644 index 0000000000..cf6f293494 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.cpp @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.cpp + */ + +#include "TestResult.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +TestResult::TestResult(const TestOptions& to) : + tests::storePerftools::common::TestResult(), + m_testOpts(to) +{} + +TestResult::~TestResult() +{} + +void +TestResult::toStream(std::ostream& os) const +{ + double msgsRate; + os << "TEST RESULTS:" << std::endl; + os << " Msgs per thread: " << m_testOpts.m_numMsgs << std::endl; + os << " Msg size: " << m_testOpts.m_msgSize << std::endl; + os << " No. queues: " << m_testOpts.m_numQueues << std::endl; + os << " No. enq threads/queue: " << m_testOpts.m_numEnqThreadsPerQueue << std::endl; + os << " No. deq threads/queue: " << m_testOpts.m_numDeqThreadsPerQueue << std::endl; + os << " Time taken: " << m_elapsed << " sec" << std::endl; + uint32_t msgsPerQueue = m_testOpts.m_numMsgs * m_testOpts.m_numEnqThreadsPerQueue; + if (m_testOpts.m_numQueues > 1) { + msgsRate = double(msgsPerQueue) / m_elapsed; + os << " No. msgs per queue: " << msgsPerQueue << std::endl; + os << "Per queue msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; + os << " " << (msgsRate * m_testOpts.m_msgSize / 1e6) << " MB/sec" << std::endl; + } + uint32_t totalMsgs = msgsPerQueue * m_testOpts.m_numQueues; + msgsRate = double(totalMsgs) / m_elapsed; + os << " Total no. msgs: " << totalMsgs << std::endl; + os << " Broker msg throughput: " << (msgsRate / 1e3) << " kMsgs/sec" << std::endl; + os << " " << (msgsRate * m_testOpts.m_msgSize / 1e6) << " MB/sec" << std::endl; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h new file mode 100644 index 0000000000..1310689ff8 --- /dev/null +++ b/cpp/src/tests/storePerfTools/asyncPerf/TestResult.h @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TestResult.h + */ + +#ifndef tests_storePerfTools_asyncPerf_TestResult_h_ +#define tests_storePerfTools_asyncPerf_TestResult_h_ + +#include "TestOptions.h" + +#include "tests/storePerfTools/common/TestResult.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class TestOptions; + +/** + * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in the journal. + * + * This class (being subclassed from ScopedTimable) is passed to a ScopedTimer object on construction, and the + * inherited _elapsed member will be written with the calculated elapsed time (in seconds) on destruction of the + * ScopedTimer object. This time (initially set to 0.0) is used to calculate message and message byte throughput. + * The message number and size information comes from the JrnlPerfTestParameters object passed to the constructor. + * + * Results are available through the use of toStream(), toString() or the << operators. + * + * Output is in the following format: + * <pre> + * TEST RESULTS: + * Msgs per thread: 10000 + * Msg size: 2048 + * No. queues: 2 + * No. threads/queue: 2 + * Time taken: 1.6626 sec + * Total no. msgs: 40000 + * Msg throughput: 24.0587 kMsgs/sec + * 49.2723 MB/sec + * </pre> + */ +class TestResult : public tests::storePerftools::common::TestResult +{ +public: + /** + * \brief Constructor + * + * Constructor. Will start the time interval measurement. + * + * \param tp Test parameter details used to calculate the performance results. + */ + TestResult(const TestOptions& to); + + /** + * \brief Virtual destructor + */ + virtual ~TestResult(); + + /** + * \brief Stream the performance test results to an output stream + * + * Convenience feature which streams a multi-line performance result an output stream. + * + * \param os Output stream to which the results are to be streamed + */ + void toStream(std::ostream& os = std::cout) const; + +protected: + TestOptions m_testOpts; ///< Test parameters used for performance calculations + +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerfTools_asyncPerf_TestResult_h_ |