diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-07-16 13:54:11 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-07-16 13:54:11 +0000 |
commit | a804510d81ade0594a75b5c9b8765cafcc233245 (patch) | |
tree | 8c6be643564b6d8c88619d17de7150c98a314781 /cpp/src/tests | |
parent | 1ab07197127e990da2c765ea0ffa5fd8ca47b7b6 (diff) | |
download | qpid-python-a804510d81ade0594a75b5c9b8765cafcc233245.tar.gz |
QPID-3858: Refactor to tidy up several class design issues
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1362039 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
20 files changed, 356 insertions, 206 deletions
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index ad9a518867..637442e128 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -344,78 +344,5 @@ add_library (dlclose_noop MODULE dlclose_noop.c) #check-long: # $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND= - -# Async Store perf tests -# ---------------------- - -# New journal perf test (jrnl2Perf) -set (jrnl2Perf_SOURCES - storePerftools/jrnlPerf/Journal.cpp - storePerftools/jrnlPerf/JournalParameters.cpp - storePerftools/jrnlPerf/PerfTest.cpp - storePerftools/jrnlPerf/TestResult.cpp - - storePerftools/common/Parameters.cpp - storePerftools/common/PerftoolError.cpp - storePerftools/common/ScopedTimable.cpp - storePerftools/common/ScopedTimer.cpp - storePerftools/common/Streamable.cpp - storePerftools/common/TestParameters.cpp - storePerftools/common/TestResult.cpp - storePerftools/common/Thread.cpp -) - -if (UNIX) - add_executable (jrnl2Perf ${jrnl2Perf_SOURCES}) - set_target_properties (jrnl2Perf PROPERTIES - COMPILE_FLAGS "-DJOURNAL2" - ) - target_link_libraries (jrnl2Perf - asyncStore - qpidbroker - rt - ) -endif (UNIX) - -# Async store perf test (asyncPerf) -set (asyncStorePerf_SOURCES - storePerftools/asyncPerf/Deliverable.cpp - storePerftools/asyncPerf/DeliveryRecord.cpp - storePerftools/asyncPerf/MessageAsyncContext.cpp - storePerftools/asyncPerf/MessageConsumer.cpp - storePerftools/asyncPerf/MessageDeque.cpp - storePerftools/asyncPerf/MessageProducer.cpp - storePerftools/asyncPerf/PerfTest.cpp - storePerftools/asyncPerf/QueueAsyncContext.cpp - storePerftools/asyncPerf/QueuedMessage.cpp - storePerftools/asyncPerf/SimpleMessage.cpp - storePerftools/asyncPerf/SimpleQueue.cpp - storePerftools/asyncPerf/TestOptions.cpp - storePerftools/asyncPerf/TestResult.cpp - storePerftools/asyncPerf/TxnAccept.cpp - storePerftools/asyncPerf/TxnPublish.cpp - - storePerftools/common/Parameters.cpp - storePerftools/common/PerftoolError.cpp - storePerftools/common/ScopedTimable.cpp - storePerftools/common/ScopedTimer.cpp - storePerftools/common/Streamable.cpp - storePerftools/common/TestOptions.cpp - storePerftools/common/TestResult.cpp - storePerftools/common/Thread.cpp -) - -if (UNIX) - add_executable (asyncStorePerf ${asyncStorePerf_SOURCES}) - set_target_properties (asyncStorePerf PROPERTIES - COMPILE_FLAGS "-DJOURNAL2" - ) - target_link_libraries (asyncStorePerf - boost_program_options - asyncStore - qpidbroker - qpidcommon - qpidtypes - rt - ) -endif (UNIX) +# Include async store tests +include(asyncstore.cmake) diff --git a/cpp/src/tests/asyncstore.cmake b/cpp/src/tests/asyncstore.cmake new file mode 100644 index 0000000000..cd20394908 --- /dev/null +++ b/cpp/src/tests/asyncstore.cmake @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# +# Async store test CMake fragment, to be included in tests/CMakeLists.txt +# + +# New journal perf test (jrnl2Perf) +set (jrnl2Perf_SOURCES + storePerftools/jrnlPerf/Journal.cpp + storePerftools/jrnlPerf/JournalParameters.cpp + storePerftools/jrnlPerf/PerfTest.cpp + storePerftools/jrnlPerf/TestResult.cpp + + storePerftools/common/Parameters.cpp + storePerftools/common/PerftoolError.cpp + storePerftools/common/ScopedTimable.cpp + storePerftools/common/ScopedTimer.cpp + storePerftools/common/Streamable.cpp + storePerftools/common/TestParameters.cpp + storePerftools/common/TestResult.cpp + storePerftools/common/Thread.cpp +) + +if (UNIX) + add_executable (jrnl2Perf ${jrnl2Perf_SOURCES}) + set_target_properties (jrnl2Perf PROPERTIES + COMPILE_FLAGS "-DJOURNAL2" + ) + target_link_libraries (jrnl2Perf + asyncStore + qpidbroker + rt + ) +endif (UNIX) + +# Async store perf test (asyncPerf) +set (asyncStorePerf_SOURCES + storePerftools/asyncPerf/Deliverable.cpp + storePerftools/asyncPerf/DeliveryRecord.cpp + storePerftools/asyncPerf/MessageAsyncContext.cpp + storePerftools/asyncPerf/MessageConsumer.cpp + storePerftools/asyncPerf/MessageDeque.cpp + storePerftools/asyncPerf/MessageProducer.cpp + storePerftools/asyncPerf/PerfTest.cpp + storePerftools/asyncPerf/PersistableQueuedMessage.cpp + storePerftools/asyncPerf/QueueAsyncContext.cpp + storePerftools/asyncPerf/QueuedMessage.cpp + storePerftools/asyncPerf/SimpleMessage.cpp + storePerftools/asyncPerf/SimpleQueue.cpp + storePerftools/asyncPerf/TestOptions.cpp + storePerftools/asyncPerf/TestResult.cpp + storePerftools/asyncPerf/TxnAccept.cpp + storePerftools/asyncPerf/TxnPublish.cpp + + storePerftools/common/Parameters.cpp + storePerftools/common/PerftoolError.cpp + storePerftools/common/ScopedTimable.cpp + storePerftools/common/ScopedTimer.cpp + storePerftools/common/Streamable.cpp + storePerftools/common/TestOptions.cpp + storePerftools/common/TestResult.cpp + storePerftools/common/Thread.cpp +) + +if (UNIX) + add_executable (asyncStorePerf ${asyncStorePerf_SOURCES}) + set_target_properties (asyncStorePerf PROPERTIES + COMPILE_FLAGS "-DJOURNAL2" + ) + target_link_libraries (asyncStorePerf + boost_program_options + asyncStore + qpidbroker + qpidcommon + qpidtypes + rt + ) +endif (UNIX) diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp index b7250ecf40..1728a2dc1e 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp @@ -31,7 +31,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -DeliveryRecord::DeliveryRecord(const QueuedMessage& qm, +DeliveryRecord::DeliveryRecord(boost::shared_ptr<QueuedMessage> qm, MessageConsumer& mc, bool accepted) : m_queuedMessage(qm), @@ -47,7 +47,7 @@ bool DeliveryRecord::accept() { if (!m_ended) { - m_queuedMessage.getQueue()->dequeue(m_queuedMessage); + m_queuedMessage->getQueue()->dequeue(m_queuedMessage); m_accepted = true; setEnded(); } @@ -64,7 +64,7 @@ bool DeliveryRecord::setEnded() { m_ended = true; - m_queuedMessage.payload() = boost::intrusive_ptr<SimpleMessage>(0); + m_queuedMessage->payload() = boost::intrusive_ptr<SimpleMessage>(0); return isRedundant(); } @@ -83,7 +83,7 @@ DeliveryRecord::isRedundant() const void DeliveryRecord::dequeue(qpid::broker::TxnHandle& txn) { - m_queuedMessage.getQueue()->dequeue(txn, m_queuedMessage); + m_queuedMessage->getQueue()->dequeue(txn, m_queuedMessage); } void @@ -92,7 +92,7 @@ DeliveryRecord::committed() const m_msgConsumer.commitComplete(); } -QueuedMessage +boost::shared_ptr<QueuedMessage> DeliveryRecord::getQueuedMessage() const { return m_queuedMessage; diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h index 427cf846f0..bb89787737 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h @@ -26,6 +26,8 @@ #include "QueuedMessage.h" +#include <boost/shared_ptr.hpp> + namespace qpid { namespace broker { class TxnHandle; @@ -39,7 +41,7 @@ class MessageConsumer; class DeliveryRecord { public: - DeliveryRecord(const QueuedMessage& qm, + DeliveryRecord(boost::shared_ptr<QueuedMessage> qm, MessageConsumer& mc, bool accepted); virtual ~DeliveryRecord(); @@ -50,9 +52,9 @@ public: bool isRedundant() const; void dequeue(qpid::broker::TxnHandle& txn); void committed() const; - QueuedMessage getQueuedMessage() const; + boost::shared_ptr<QueuedMessage> getQueuedMessage() const; private: - QueuedMessage m_queuedMessage; + boost::shared_ptr<QueuedMessage> m_queuedMessage; MessageConsumer& m_msgConsumer; bool m_accepted : 1; bool m_ended : 1; diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp index c61ce352a1..8b79a91ac1 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp @@ -42,7 +42,7 @@ MessageDeque::size() } bool -MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*removed*/) +MessageDeque::push(boost::shared_ptr<QueuedMessage>& added) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); m_messages.push_back(added); @@ -50,7 +50,7 @@ MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*removed*/) } bool -MessageDeque::consume(QueuedMessage& msg) +MessageDeque::consume(boost::shared_ptr<QueuedMessage>& msg) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); if (!m_messages.empty()) { diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h index 75f422779e..021015f3e0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h @@ -46,10 +46,10 @@ public: MessageDeque(); virtual ~MessageDeque(); uint32_t size(); - bool push(const QueuedMessage& added, QueuedMessage& removed); - bool consume(QueuedMessage& msg); + bool push(boost::shared_ptr<QueuedMessage>& added); + bool consume(boost::shared_ptr<QueuedMessage>& msg); private: - std::deque<QueuedMessage> m_messages; + std::deque<boost::shared_ptr<QueuedMessage> > m_messages; qpid::sys::Mutex m_msgMutex; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/Messages.h b/cpp/src/tests/storePerftools/asyncPerf/Messages.h index 9b5bd0be99..c1bfa328ea 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/Messages.h +++ b/cpp/src/tests/storePerftools/asyncPerf/Messages.h @@ -30,6 +30,7 @@ #ifndef tests_storePerftools_asyncPerf_Messages_h_ #define tests_storePerftools_asyncPerf_Messages_h_ +#include <boost/shared_ptr.hpp> #include <stdint.h> namespace tests { @@ -43,8 +44,8 @@ class Messages public: virtual ~Messages() {} virtual uint32_t size() = 0; - virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0; - virtual bool consume(QueuedMessage& msg) = 0; + virtual bool push(boost::shared_ptr<QueuedMessage>& added) = 0; + virtual bool consume(boost::shared_ptr<QueuedMessage>& msg) = 0; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index 4d145d321d..1497b678a0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -31,7 +31,10 @@ #include "tests/storePerftools/common/ScopedTimer.h" #include "tests/storePerftools/common/Thread.h" +#include "qpid/Modules.h" // Use with loading store as module #include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/asyncStore/AsyncStoreOptions.h" +#include "qpid/broker/AsyncStore.h" #include "qpid/sys/Poller.h" #include <iomanip> @@ -161,16 +164,15 @@ PerfTest::destroyQueues() } } -}}} // namespace tests::storePerftools::asyncPerf - -// ----------------------------------------------------------------- - int -main(int argc, char** argv) +runPerfTest(int argc, char** argv) { + // Load async store module + qpid::tryShlib ("asyncStore.so", false); + qpid::CommonOptions co; qpid::asyncStore::AsyncStoreOptions aso; - tests::storePerftools::asyncPerf::TestOptions to; + TestOptions to; qpid::Options opts; opts.add(co).add(aso).add(to); try { @@ -203,5 +205,16 @@ main(int argc, char** argv) // Print test result std::cout << apt << std::endl; //::sleep(1); + return 0; } + +}}} // namespace tests::storePerftools::asyncPerf + +// ----------------------------------------------------------------- + +int +main(int argc, char** argv) +{ + return tests::storePerftools::asyncPerf::runPerfTest(argc, argv); +} diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h index 6cdf015f76..7cbb71322f 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h @@ -36,6 +36,9 @@ #include <deque> namespace qpid { +namespace broker { +class AsyncStore; +} namespace asyncStore { class AsyncStoreImpl; class AsyncStoreOptions; @@ -83,6 +86,8 @@ private: }; +int runPerfTest(int argc, char** argv); + }}} // namespace tests::storePerftools::asyncPerf #endif // tests_storePerftools_asyncPerf_PerfTest_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp new file mode 100644 index 0000000000..2eba7d5be5 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.cpp @@ -0,0 +1,52 @@ +#include "PersistableQueuedMessage.h" + +#include "SimpleQueue.h" +#include "SimpleMessage.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +PersistableQueuedMessage::PersistableQueuedMessage() +{} + +PersistableQueuedMessage::PersistableQueuedMessage(SimpleQueue* q, + boost::intrusive_ptr<SimpleMessage> msg) : + QueuedMessage(q, msg), + m_enqHandle(q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle())) +{} + +PersistableQueuedMessage::PersistableQueuedMessage(const PersistableQueuedMessage& pm) : + QueuedMessage(pm), + m_enqHandle(pm.m_enqHandle) +{} + +PersistableQueuedMessage::PersistableQueuedMessage(PersistableQueuedMessage* const pm) : + QueuedMessage(pm), + m_enqHandle(pm->m_enqHandle) +{} + +PersistableQueuedMessage::~PersistableQueuedMessage() +{} + +PersistableQueuedMessage& +PersistableQueuedMessage::operator=(const PersistableQueuedMessage& rhs) +{ + QueuedMessage::operator=(rhs); + m_enqHandle = rhs.m_enqHandle; + return *this; +} + +const qpid::broker::EnqueueHandle& +PersistableQueuedMessage::enqHandle() const +{ + return m_enqHandle; +} + +qpid::broker::EnqueueHandle& +PersistableQueuedMessage::enqHandle() +{ + return m_enqHandle; +} + +}}} // tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h new file mode 100644 index 0000000000..1e9446aa57 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/PersistableQueuedMessage.h @@ -0,0 +1,31 @@ +#ifndef tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_ +#define tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_ + +#include "QueuedMessage.h" + +#include "qpid/broker/EnqueueHandle.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class PersistableQueuedMessage : public QueuedMessage { +public: + PersistableQueuedMessage(); + PersistableQueuedMessage(SimpleQueue* q, + boost::intrusive_ptr<SimpleMessage> msg); + PersistableQueuedMessage(const PersistableQueuedMessage& pqm); + PersistableQueuedMessage(PersistableQueuedMessage* const pqm); + virtual ~PersistableQueuedMessage(); + PersistableQueuedMessage& operator=(const PersistableQueuedMessage& rhs); + + const qpid::broker::EnqueueHandle& enqHandle() const; + qpid::broker::EnqueueHandle& enqHandle(); + +private: + qpid::broker::EnqueueHandle m_enqHandle; +}; + +}}} // tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_PersistableQueuedMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h index 112a5ab1dd..3a8850c699 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h @@ -31,6 +31,10 @@ #include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> +namespace qpid { +namespace broker { +typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); +}} namespace tests { namespace storePerftools { diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp index 11af7c9466..a733a96171 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -38,28 +38,25 @@ QueuedMessage::QueuedMessage() : QueuedMessage::QueuedMessage(SimpleQueue* q, boost::intrusive_ptr<SimpleMessage> msg) : + boost::enable_shared_from_this<QueuedMessage>(), m_queue(q), - m_msg(msg), - m_enqHandle(q->getStore() ? q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()) : qpid::broker::EnqueueHandle(0)) + m_msg(msg) {} QueuedMessage::QueuedMessage(const QueuedMessage& qm) : + boost::enable_shared_from_this<QueuedMessage>(), m_queue(qm.m_queue), - m_msg(qm.m_msg), - m_enqHandle(qm.m_enqHandle) + m_msg(qm.m_msg) {} -QueuedMessage::~QueuedMessage() +QueuedMessage::QueuedMessage(QueuedMessage* const qm) : + boost::enable_shared_from_this<QueuedMessage>(), + m_queue(qm->m_queue), + m_msg(qm->m_msg) {} -QueuedMessage& -QueuedMessage::operator=(const QueuedMessage& rhs) -{ - m_queue = rhs.m_queue; - m_msg = rhs.m_msg; - m_enqHandle = rhs.m_enqHandle; - return *this; -} +QueuedMessage::~QueuedMessage() +{} SimpleQueue* QueuedMessage::getQueue() const @@ -73,22 +70,10 @@ QueuedMessage::payload() const return m_msg; } -const qpid::broker::EnqueueHandle& -QueuedMessage::enqHandle() const -{ - return m_enqHandle; -} - -qpid::broker::EnqueueHandle& -QueuedMessage::enqHandle() -{ - return m_enqHandle; -} - void QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th) { - m_queue->enqueue(th, *this); + m_queue->enqueue(th, shared_from_this()); } void diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h index 12c8e4da08..7d4e5bbbe4 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h @@ -24,8 +24,9 @@ #ifndef tests_storePerftools_asyncPerf_QueuedMessage_h_ #define tests_storePerftools_asyncPerf_QueuedMessage_h_ -#include "qpid/broker/EnqueueHandle.h" +#include "qpid/broker/AsyncStore.h" +#include <boost/enable_shared_from_this.hpp> #include <boost/intrusive_ptr.hpp> namespace qpid { @@ -42,19 +43,17 @@ namespace asyncPerf { class SimpleMessage; class SimpleQueue; -class QueuedMessage +class QueuedMessage : public boost::enable_shared_from_this<QueuedMessage> { public: QueuedMessage(); QueuedMessage(SimpleQueue* q, boost::intrusive_ptr<SimpleMessage> msg); QueuedMessage(const QueuedMessage& qm); - ~QueuedMessage(); - QueuedMessage& operator=(const QueuedMessage& rhs); + QueuedMessage(QueuedMessage* const qm); + virtual ~QueuedMessage(); SimpleQueue* getQueue() const; boost::intrusive_ptr<SimpleMessage> payload() const; - const qpid::broker::EnqueueHandle& enqHandle() const; - qpid::broker::EnqueueHandle& enqHandle(); // -- Transaction handling --- void prepareEnqueue(qpid::broker::TxnHandle& th); @@ -64,7 +63,6 @@ public: private: SimpleQueue* m_queue; boost::intrusive_ptr<SimpleMessage> m_msg; - qpid::broker::EnqueueHandle m_enqHandle; }; }}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp index 29db6ceaf2..889f7a4cdd 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp @@ -30,11 +30,20 @@ namespace storePerftools { namespace asyncPerf { SimpleMessage::SimpleMessage(const char* msgData, + const uint32_t msgSize) : + m_persistenceId(0ULL), + m_msg(msgData, static_cast<size_t>(msgSize)), + m_store(0), + m_msgHandle(qpid::broker::MessageHandle()) +{} + +SimpleMessage::SimpleMessage(const char* msgData, const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store) : + qpid::broker::AsyncStore* store) : m_persistenceId(0ULL), m_msg(msgData, static_cast<size_t>(msgSize)), - m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0)) + m_store(store), + m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle()) {} SimpleMessage::~SimpleMessage() @@ -95,7 +104,7 @@ SimpleMessage::encodedHeaderSize() const bool SimpleMessage::isPersistent() const { - return m_msgHandle.isValid(); + return m_store != 0; } uint64_t diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h index 1b3e034814..01f54c1c19 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h @@ -28,13 +28,6 @@ #include "qpid/broker/MessageHandle.h" #include "qpid/broker/PersistableMessage.h" -#include <set> - -namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -}} - namespace tests { namespace storePerftools { namespace asyncPerf { @@ -46,8 +39,10 @@ class SimpleMessage: public qpid::broker::PersistableMessage, { public: SimpleMessage(const char* msgData, + const uint32_t msgSize); + SimpleMessage(const char* msgData, const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store); + qpid::broker::AsyncStore* store); virtual ~SimpleMessage(); const qpid::broker::MessageHandle& getHandle() const; qpid::broker::MessageHandle& getHandle(); @@ -71,6 +66,8 @@ public: private: mutable uint64_t m_persistenceId; const std::string m_msg; + qpid::broker::AsyncStore* m_store; + qpid::broker::MessageHandle m_msgHandle; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index 8bb79367ed..3bce2fb52a 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -26,13 +26,15 @@ #include "DeliveryRecord.h" #include "MessageConsumer.h" #include "MessageDeque.h" -#include "SimpleMessage.h" +#include "PersistableQueuedMessage.h" #include "QueueAsyncContext.h" #include "QueuedMessage.h" +#include "SimpleMessage.h" #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/broker/AsyncResultHandle.h" -#include "qpid/broker/TxnHandle.h" + +#include <boost/make_shared.hpp> namespace tests { namespace storePerftools { @@ -44,7 +46,7 @@ qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operat SimpleQueue::SimpleQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq) : qpid::broker::PersistableQueue(), m_name(name), @@ -117,7 +119,7 @@ SimpleQueue::getHandle() return m_queueHandle; } -qpid::asyncStore::AsyncStoreImpl* +qpid::broker::AsyncStore* SimpleQueue::getStore() { return m_store; @@ -161,15 +163,24 @@ SimpleQueue::asyncDestroy(const bool deleteQueue) void SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) { - QueuedMessage qm(this, msg); + boost::shared_ptr<QueuedMessage> qm; + if (msg->isPersistent() && m_store) { + qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); + } else { + qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg)); + } +//boost::shared_ptr<PersistableQueuedMessage> pqm1 = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); +//assert(pqm1.get()); enqueue(s_nullTxnHandle, qm); +//boost::shared_ptr<PersistableQueuedMessage> pqm2 = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); +//assert(pqm2.get()); push(qm); } bool SimpleQueue::dispatch(MessageConsumer& mc) { - QueuedMessage qm; + boost::shared_ptr<QueuedMessage> qm; if (m_messages->consume(qm)) { boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false)); mc.record(dr); @@ -179,43 +190,47 @@ SimpleQueue::dispatch(MessageConsumer& mc) } bool -SimpleQueue::enqueue(QueuedMessage& qm) +SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) { return enqueue(s_nullTxnHandle, qm); } bool SimpleQueue::enqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) + boost::shared_ptr<QueuedMessage> qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; } - if (qm.payload()->isPersistent() && m_store) { - qm.payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(th, qm); + if (qm->payload()->isPersistent() && m_store) { + qm->payload()->enqueueAsync(shared_from_this(), m_store); + return asyncEnqueue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm)); } return false; } bool -SimpleQueue::dequeue(QueuedMessage& qm) +SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) { return dequeue(s_nullTxnHandle, qm); } bool SimpleQueue::dequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) + boost::shared_ptr<QueuedMessage> qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; } - if (qm.payload()->isPersistent() && m_store) { - qm.payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(th, qm); + if (qm->payload()->isPersistent() && m_store) { + qm->payload()->dequeueAsync(shared_from_this(), m_store); +//assert(qm.get()); +//boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); +//assert(pqm.get()); +//return asyncDequeue(th, pqm); + return asyncDequeue(th, boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm)); } return true; } @@ -223,7 +238,12 @@ SimpleQueue::dequeue(qpid::broker::TxnHandle& th, void SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) { - QueuedMessage qm(this, msg); + boost::shared_ptr<QueuedMessage> qm; + if (msg->isPersistent() && m_store) { + qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(this, msg)); + } else { + qm = boost::make_shared<QueuedMessage>(new QueuedMessage(this, msg)); + } push(qm); } @@ -343,11 +363,13 @@ SimpleQueue::ScopedUse::~ScopedUse() // private void -SimpleQueue::push(QueuedMessage& qm, +SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm, bool /*isRecovery*/) { - QueuedMessage removed; - m_messages->push(qm, removed); +boost::shared_ptr<PersistableQueuedMessage> pqm = boost::dynamic_pointer_cast<PersistableQueuedMessage>(qm); +assert(pqm.get()); + + m_messages->push(qm); } // --- End Members & methods in msg handling path from qpid::Queue --- @@ -355,20 +377,22 @@ SimpleQueue::push(QueuedMessage& qm, // private bool SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) + boost::shared_ptr<PersistableQueuedMessage> pqm) { - qm.payload()->setPersistenceId(m_store->getNextRid()); -//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; + assert(pqm.get()); +// qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this +//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << pqm->payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), - qm.payload(), + pqm->payload(), th, qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, &handleAsyncResult, &m_resultQueue)); + // TODO : This must be done from inside store, not here if (th.isValid()) { th.incrOpCnt(); } - m_store->submitEnqueue(qm.enqHandle(), + m_store->submitEnqueue(pqm->enqHandle(), th, qac); ++m_asyncOpCounter; @@ -378,19 +402,21 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, // private bool SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) + boost::shared_ptr<PersistableQueuedMessage> pqm) { + assert(pqm.get()); //std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), - qm.payload(), + pqm->payload(), th, qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, &handleAsyncResult, &m_resultQueue)); + // TODO : This must be done from inside store, not here if (th.isValid()) { th.incrOpCnt(); } - m_store->submitDequeue(qm.enqHandle(), + m_store->submitDequeue(pqm->enqHandle(), th, qac); ++m_asyncOpCounter; @@ -445,6 +471,8 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) --m_asyncOpCounter; qpid::broker::TxnHandle th = qc->getTxnHandle(); + + // TODO : This must be done from inside store, not here if (th.isValid()) { // transactional enqueue th.decrOpCnt(); } @@ -459,6 +487,8 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) --m_asyncOpCounter; qpid::broker::TxnHandle th = qc->getTxnHandle(); + + // TODO : This must be done from inside store, not here if (th.isValid()) { // transactional enqueue th.decrOpCnt(); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h index 59e12b5c93..81ea8b022b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h @@ -40,7 +40,6 @@ class AsyncStoreImpl; } namespace broker { class AsyncResultQueue; -class TxnHandle; } namespace framing { class FieldTable; @@ -52,9 +51,10 @@ namespace asyncPerf { class MessageConsumer; class Messages; -class SimpleMessage; +class PersistableQueuedMessage; class QueueAsyncContext; class QueuedMessage; +class SimpleMessage; class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>, public qpid::broker::PersistableQueue, @@ -63,14 +63,14 @@ class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>, public: SimpleQueue(const std::string& name, const qpid::framing::FieldTable& args, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq); virtual ~SimpleQueue(); static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res); const qpid::broker::QueueHandle& getHandle() const; qpid::broker::QueueHandle& getHandle(); - qpid::asyncStore::AsyncStoreImpl* getStore(); + qpid::broker::AsyncStore* getStore(); void asyncCreate(); void asyncDestroy(const bool deleteQueue); @@ -78,12 +78,12 @@ public: // --- Methods in msg handling path from qpid::Queue --- void deliver(boost::intrusive_ptr<SimpleMessage> msg); bool dispatch(MessageConsumer& mc); - bool enqueue(QueuedMessage& qm); + bool enqueue(boost::shared_ptr<QueuedMessage> qm); bool enqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm); - bool dequeue(QueuedMessage& qm); + boost::shared_ptr<QueuedMessage> qm); + bool dequeue(boost::shared_ptr<QueuedMessage> qm); bool dequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm); + boost::shared_ptr<QueuedMessage> qm); void process(boost::intrusive_ptr<SimpleMessage> msg); void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg); @@ -106,9 +106,9 @@ private: static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations const std::string m_name; - qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncStore* m_store; qpid::broker::AsyncResultQueue& m_resultQueue; - qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; + qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; // TODO: change this to non-async store counter! mutable uint64_t m_persistenceId; std::string m_persistableData; qpid::broker::QueueHandle m_queueHandle; @@ -135,14 +135,14 @@ private: }; UsageBarrier m_barrier; std::auto_ptr<Messages> m_messages; - void push(QueuedMessage& qm, + void push(boost::shared_ptr<QueuedMessage> qm, bool isRecovery = false); // -- Async ops --- bool asyncEnqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm); + boost::shared_ptr<PersistableQueuedMessage> pqm); bool asyncDequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm); + boost::shared_ptr<PersistableQueuedMessage> pqm); // --- Async op counter --- void destroyCheck(const std::string& opDescr) const; diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp index 3c5b99b5d5..7bede50272 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp @@ -25,6 +25,8 @@ #include "DeliveryRecord.h" +#include "qpid/log/Statement.h" + namespace tests { namespace storePerftools { namespace asyncPerf { @@ -41,15 +43,14 @@ TxnAccept::~TxnAccept() bool TxnAccept::prepare(qpid::broker::TxnHandle& th) throw() { -//std::cout << "TTT TxnAccept::prepare" << std::endl << std::flush; try { for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->dequeue(th); } } catch (const std::exception& e) { - std::cerr << "TxnAccept: Failed to prepare transaction: " << e.what() << std::endl; + QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what()); } catch (...) { - std::cerr << "TxnAccept: Failed to prepare transaction: (unknown error)" << std::endl; + QPID_LOG(error, "TxnAccept: Failed to prepare transaction: (unknown error)"); } return false; } @@ -57,23 +58,20 @@ TxnAccept::prepare(qpid::broker::TxnHandle& th) throw() void TxnAccept::commit() throw() { -//std::cout << "TTT TxnAccept::commit" << std::endl << std::flush; try { for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) { (*i)->committed(); (*i)->setEnded(); } } catch (const std::exception& e) { - std::cerr << "TxnAccept: Failed to commit transaction: " << e.what() << std::endl; + QPID_LOG(error, "TxnAccept: Failed to commit transaction: " << e.what()); } catch(...) { - std::cerr << "TxnAccept: Failed to commit transaction: (unknown error)" << std::endl; + QPID_LOG(error, "TxnAccept: Failed to commit transaction: (unknown error)"); } } void TxnAccept::rollback() throw() -{ -//std::cout << "TTT TxnAccept::rollback" << std::endl << std::flush; -} +{} }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp index 10e48bef82..0c34520d06 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp @@ -21,11 +21,16 @@ * \file TxnPublish.cpp */ -#include "SimpleMessage.h" -#include "SimpleQueue.h" // debug msg #include "TxnPublish.h" +#include "PersistableQueuedMessage.h" #include "QueuedMessage.h" +#include "SimpleMessage.h" +#include "SimpleQueue.h" // debug msg + +#include "qpid/log/Statement.h" + +#include <boost/make_shared.hpp> namespace tests { namespace storePerftools { @@ -33,9 +38,7 @@ namespace asyncPerf { TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) : m_msg(msg) -{ -//std::cout << "TTT new TxnPublish" << std::endl << std::flush; -} +{} TxnPublish::~TxnPublish() {} @@ -43,7 +46,6 @@ TxnPublish::~TxnPublish() bool TxnPublish::prepare(qpid::broker::TxnHandle& th) throw() { -//std::cout << "TTT TxnPublish::prepare: " << m_queues.size() << " queues" << std::endl << std::flush; try{ while (!m_queues.empty()) { m_queues.front()->prepareEnqueue(th); @@ -52,9 +54,9 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw() } return true; } catch (const std::exception& e) { - std::cerr << "TxnPublish: Failed to prepare transaction: " << e.what() << std::endl; + QPID_LOG(error, "TxnPublish: Failed to prepare transaction: " << e.what()); } catch (...) { - std::cerr << "TxnPublish: Failed to prepare transaction: (unknown error)" << std::endl; + QPID_LOG(error, "TxnPublish: Failed to prepare transaction: (unknown error)"); } return false; } @@ -62,30 +64,28 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw() void TxnPublish::commit() throw() { -//std::cout << "TTT TxnPublish::commit" << std::endl << std::flush; try { for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { (*i)->commitEnqueue(); } } catch (const std::exception& e) { - std::cerr << "TxnPublish: Failed to commit transaction: " << e.what() << std::endl; + QPID_LOG(error, "TxnPublish: Failed to commit transaction: " << e.what()); } catch (...) { - std::cerr << "TxnPublish: Failed to commit transaction: (unknown error)" << std::endl; + QPID_LOG(error, "TxnPublish: Failed to commit transaction: (unknown error)"); } } void TxnPublish::rollback() throw() { -//std::cout << "TTT TxnPublish::rollback" << std::endl << std::flush; try { for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { (*i)->abortEnqueue(); } } catch (const std::exception& e) { - std::cerr << "TxnPublish: Failed to rollback transaction: " << e.what() << std::endl; + QPID_LOG(error, "TxnPublish: Failed to rollback transaction: " << e.what()); } catch (...) { - std::cerr << "TxnPublish: Failed to rollback transaction: (unknown error)" << std::endl; + QPID_LOG(error, "TxnPublish: Failed to rollback transaction: (unknown error)"); } } @@ -98,8 +98,12 @@ TxnPublish::contentSize() void TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) { -//std::cout << "TTT TxnPublish::deliverTo queue=\"" << queue->getName() << "\"" << std::endl << std::flush; - boost::shared_ptr<QueuedMessage> qm(new QueuedMessage(queue.get(), m_msg)); + boost::shared_ptr<QueuedMessage> qm; + if (m_msg->isPersistent() && queue->getStore()) { + qm = boost::make_shared<PersistableQueuedMessage>(new PersistableQueuedMessage(queue.get(), m_msg)); + } else { + qm = boost::make_shared<QueuedMessage>(new QueuedMessage(queue.get(), m_msg)); + } m_queues.push_back(qm); m_delivered = true; } |