diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-06-01 15:30:01 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-06-01 15:30:01 +0000 |
commit | ad9bebb1157f009151973cf721fdebdd663d39e3 (patch) | |
tree | 3b8dc0a9fa3de3b88bcbb82572a06cb579fa3002 | |
parent | 220841d24ff48f27339000e887d5465a53c39013 (diff) | |
download | qpid-python-ad9bebb1157f009151973cf721fdebdd663d39e3.tar.gz |
WIP: Non-transactional message path in place. Transactions not working.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1345240 13f79535-47bb-0310-9956-ffa450edef68
41 files changed, 1308 insertions, 435 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 8246050f96..06608128bc 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -1078,7 +1078,6 @@ set (qpidbroker_SOURCES qpid/amqp_0_10/Connection.cpp qpid/broker/AsyncStore.cpp qpid/broker/Broker.cpp - qpid/broker/BrokerAsyncContext.h qpid/broker/Credit.cpp qpid/broker/Exchange.cpp qpid/broker/ExpiryPolicy.cpp @@ -1487,7 +1486,9 @@ set (jrnl2_SOURCES # AsyncStore source files set (asyncStore_SOURCES + qpid/asyncStore/AsyncOpCounter.cpp qpid/asyncStore/AsyncOperation.cpp + qpid/asyncStore/AsyncStoreHandleImpl.cpp qpid/asyncStore/AsyncStoreImpl.cpp qpid/asyncStore/AsyncStoreOptions.cpp qpid/asyncStore/ConfigHandleImpl.cpp diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index 5a4905fef6..083034acc4 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -246,9 +246,9 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, void AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, - qpid::broker::TxnHandle& txnHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) + qpid::broker::TxnHandle& txnHandle, + qpid::broker::ResultCallback resultCb, + qpid::broker::BrokerAsyncContext* brokerCtxt) { AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY, dynamic_cast<qpid::broker::IdHandle*>(&eventHandle), @@ -260,18 +260,6 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle, void AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) -{ - AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), - resultCb, - brokerCtxt); - m_operations.submit(op); -} - -void -AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, qpid::broker::ResultCallback resultCb, qpid::broker::BrokerAsyncContext* brokerCtxt) @@ -282,18 +270,8 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, resultCb, brokerCtxt); m_operations.submit(op); -} - -void -AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt) -{ - AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE, - dynamic_cast<qpid::broker::IdHandle*>(&enqHandle), - resultCb, - brokerCtxt); - m_operations.submit(op); +//delete op; +//delete brokerCtxt; } void diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index 7e3b3e94da..0298c74dc5 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -111,16 +111,10 @@ public: qpid::broker::BrokerAsyncContext* brokerCtxt); void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); - void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, qpid::broker::ResultCallback resultCb, qpid::broker::BrokerAsyncContext* brokerCtxt); void submitDequeue(qpid::broker::EnqueueHandle& enqHandle, - qpid::broker::ResultCallback resultCb, - qpid::broker::BrokerAsyncContext* brokerCtxt); - void submitDequeue(qpid::broker::EnqueueHandle& enqHandle, qpid::broker::TxnHandle& txnHandle, qpid::broker::ResultCallback resultCb, qpid::broker::BrokerAsyncContext* brokerCtxt); diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp index 1e52eb3612..69ddf7645e 100644 --- a/cpp/src/qpid/asyncStore/OperationQueue.cpp +++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp @@ -23,8 +23,6 @@ #include "OperationQueue.h" -#include "qpid/broker/BrokerAsyncContext.h" - namespace qpid { namespace asyncStore { @@ -42,7 +40,7 @@ OperationQueue::~OperationQueue() void OperationQueue::submit(const AsyncOperation* op) { -//std::cout << "***** OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; +//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush; m_opQueue.push(op); } @@ -51,7 +49,7 @@ OperationQueue::OpQueue::Batch::const_iterator OperationQueue::handle(const OperationQueue::OpQueue::Batch& e) { for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) { -//std::cout << "##### OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; +//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush; if ((*i)->m_resCb) { ((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt); } else { diff --git a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h index b13caa5462..207bbc68f2 100644 --- a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h +++ b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h @@ -40,7 +40,7 @@ public: /** * \brief Constructor with an option to set an inital value for the counter. */ - AtomicCounter(T initialValue = T(0)) : + AtomicCounter(const T initialValue = T(0)) : m_cnt(initialValue) {} @@ -58,13 +58,90 @@ public: * first call to next() will return 1. Upon overflow, the counter will be incremented twice so as to avoid * returning the value 0. */ - virtual T next() + T + next() { - // --- START OF CRITICAL SECTION --- ScopedLock l(m_mutex); while (!++m_cnt) ; // Cannot return 0x0 if m_cnt should overflow return m_cnt; - } // --- END OF CRITICAL SECTION --- + } + + void + operator++() + { + ScopedLock l(m_mutex); + ++m_cnt; + } + + void + operator--() + { + ScopedLock l(m_mutex); + --m_cnt; + } + + T + get() const + { + ScopedLock l(m_mutex); + return m_cnt; + } + + bool + operator==(const AtomicCounter<T>& rhs) + { + ScopedLock l(m_mutex); + return m_cnt == rhs.get(); + } + + bool + operator==(const T rhs) + { + ScopedLock l(m_mutex); + return m_cnt == rhs; + } + + bool + operator!=(const AtomicCounter<T>& rhs) + { + ScopedLock l(m_mutex); + return m_cnt != rhs.get(); + } + + bool + operator!=(const T rhs) + { + ScopedLock l(m_mutex); + return m_cnt != rhs; + } + + bool + operator>(const AtomicCounter<T>& rhs) + { + ScopedLock l(m_mutex); + return m_cnt > rhs.get(); + } + + bool + operator>(const T rhs) + { + ScopedLock l(m_mutex); + return m_cnt > rhs; + } + + bool + operator<(const AtomicCounter<T>& rhs) + { + ScopedLock l(m_mutex); + return m_cnt < rhs.get(); + } + + bool + operator<(const T rhs) + { + ScopedLock l(m_mutex); + return m_cnt < rhs; + } protected: T m_cnt; ///< Internal count value diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/AsyncStore.cpp index ff3e77dba5..649049bf41 100644 --- a/cpp/src/qpid/broker/AsyncStore.cpp +++ b/cpp/src/qpid/broker/AsyncStore.cpp @@ -22,6 +22,9 @@ namespace qpid { namespace broker { +BrokerAsyncContext::~BrokerAsyncContext() +{} + DataSource::~DataSource() {} diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 15e9120edb..eb47d62cf0 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -28,11 +28,14 @@ #include <string> namespace qpid { - namespace broker { -// Defined by broker, implements qpid::messaging::Handle-type template to hide ref counting: -class BrokerAsyncContext; +// Defined by broker, implements qpid::messaging::Handle-type template to hide ref counting +// Subclass this for specific contexts +class BrokerAsyncContext { +public: + virtual ~BrokerAsyncContext(); +}; // Subclassed by broker: class DataSource { @@ -96,9 +99,7 @@ public: virtual void submitDestroy(EventHandle&, ResultCallback, BrokerAsyncContext*) = 0; virtual void submitDestroy(EventHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitEnqueue(EnqueueHandle&, ResultCallback, BrokerAsyncContext*) = 0; virtual void submitEnqueue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; - virtual void submitDequeue(EnqueueHandle&, ResultCallback, BrokerAsyncContext*) = 0; virtual void submitDequeue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0; // Legacy - Restore FTD message, is NOT async! diff --git a/cpp/src/qpid/broker/BrokerAsyncContext.h b/cpp/src/qpid/broker/BrokerAsyncContext.h deleted file mode 100644 index 38d53a84f1..0000000000 --- a/cpp/src/qpid/broker/BrokerAsyncContext.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef qpid_broker_BrokerContext_hpp_ -#define qpid_broker_BrokerContext_hpp_ - -namespace qpid { -namespace broker { - -class BrokerAsyncContext -{ -public: - virtual ~BrokerAsyncContext() {} -}; - -}} // namespace qpid::broker - -#endif // qpid_broker_BrokerContext_hpp_ diff --git a/cpp/src/qpid/broker/MessageHandle.h b/cpp/src/qpid/broker/MessageHandle.h index 74c38d92cc..9339d81f32 100644 --- a/cpp/src/qpid/broker/MessageHandle.h +++ b/cpp/src/qpid/broker/MessageHandle.h @@ -32,7 +32,8 @@ namespace qpid { namespace broker { -class MessageHandle : public qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>, public IdHandle +class MessageHandle : public qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>, + public IdHandle { public: MessageHandle(qpid::asyncStore::MessageHandleImpl* p = 0); @@ -44,8 +45,8 @@ public: // <none> private: - typedef qpid::asyncStore::MessageHandleImpl Impl; - Impl* impl; + //typedef qpid::asyncStore::MessageHandleImpl Impl; + //Impl* impl; friend class qpid::messaging::PrivateImplRef<MessageHandle>; }; diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index 7ba28eb293..957248b522 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -21,7 +21,8 @@ #include "qpid/broker/PersistableMessage.h" -#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/MessageStore.h" +//#include "qpid/broker/AsyncStore.h" #include <iostream> using namespace qpid::broker; @@ -29,13 +30,12 @@ using namespace qpid::broker; namespace qpid { namespace broker { -class MessageStore; - PersistableMessage::~PersistableMessage() {} PersistableMessage::PersistableMessage() : asyncDequeueCounter(0), - store(0) + store(0), + asyncStore(0) {} void PersistableMessage::flush() @@ -78,8 +78,8 @@ bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ return false; } - -void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { +// deprecated +void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; @@ -88,7 +88,22 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa } } -void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { +void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store) { + if (_store){ + sys::ScopedLock<sys::Mutex> l(storeLock); + asyncStore = _store; + boost::weak_ptr<PersistableQueue> q(queue); + synclist.push_back(q); + } +} + +// deprecated +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { + addToSyncList(queue, _store); + enqueueStart(); +} + +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) { addToSyncList(queue, _store); enqueueStart(); } @@ -111,7 +126,8 @@ void PersistableMessage::dequeueComplete() { if (notify) allDequeuesComplete(); } -void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { +// deprecated +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); store = _store; @@ -121,6 +137,16 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, Messag dequeueAsync(); } +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) { + if (_store){ + sys::ScopedLock<sys::Mutex> l(storeLock); + asyncStore = _store; + boost::weak_ptr<PersistableQueue> q(queue); + synclist.push_back(q); + } + dequeueAsync(); +} + void PersistableMessage::dequeueAsync() { sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); asyncDequeueCounter++; @@ -128,11 +154,17 @@ void PersistableMessage::dequeueAsync() { PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {} +// deprecated void PersistableMessage::setStore(MessageStore* s) { store = s; } +void PersistableMessage::setStore(AsyncStore* s) +{ + asyncStore = s; +} + void PersistableMessage::requestContentRelease() { contentReleaseState.requested = true; diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index d29c2c45b4..8823cfa638 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -37,6 +37,7 @@ namespace qpid { namespace broker { class MessageStore; +class AsyncStore; /** * Base class for persistable messages. @@ -86,7 +87,8 @@ class PersistableMessage : public Persistable void setContentReleased(); - MessageStore* store; + MessageStore* store; // deprecated, use AsyncStore + AsyncStore* asyncStore; // new AsyncStore interface public: @@ -105,7 +107,8 @@ class PersistableMessage : public Persistable QPID_BROKER_EXTERN bool isContentReleased() const; - QPID_BROKER_EXTERN void setStore(MessageStore*); + QPID_BROKER_EXTERN void setStore(MessageStore*); // deprecated + QPID_BROKER_EXTERN void setStore(AsyncStore*); void requestContentRelease(); void blockContentRelease(); bool checkContentReleasable(); @@ -121,20 +124,25 @@ class PersistableMessage : public Persistable QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); } QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); } - QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, + QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated MessageStore* _store); + QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, + AsyncStore* _store); QPID_BROKER_EXTERN bool isDequeueComplete(); QPID_BROKER_EXTERN void dequeueComplete(); - QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, + QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated MessageStore* _store); + QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, + AsyncStore* _store); bool isStoredOnQueue(PersistableQueue::shared_ptr queue); - void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); + void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); // deprecated + void addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store); }; }} diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index 2bce8c058e..057879a602 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -380,6 +380,9 @@ endif (UNIX) # Async store perf test (asyncPerf) set (asyncStorePerf_SOURCES storePerftools/asyncPerf/MessageAsyncContext.cpp + storePerftools/asyncPerf/MessageConsumer.cpp + storePerftools/asyncPerf/MessageDeque.cpp + storePerftools/asyncPerf/MessageProducer.cpp storePerftools/asyncPerf/MockPersistableMessage.cpp storePerftools/asyncPerf/MockPersistableQueue.cpp storePerftools/asyncPerf/MockTransactionContext.cpp @@ -396,7 +399,6 @@ set (asyncStorePerf_SOURCES storePerftools/common/ScopedTimer.cpp storePerftools/common/Streamable.cpp storePerftools/common/TestOptions.cpp - storePerftools/common/TestParameters.cpp storePerftools/common/TestResult.cpp storePerftools/common/Thread.cpp ) diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 0058aa5133..fb429ca981 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -106,7 +106,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) { //Test basic delivery: intrusive_ptr<Message> msg1 = create_message("e", "A"); - msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process + msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process queue->process(msg1); sleep(2); @@ -121,7 +121,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) { QPID_AUTO_TEST_CASE(testAsyncMessageCount){ Queue::shared_ptr queue(new Queue("my_test_queue", true)); intrusive_ptr<Message> msg1 = create_message("e", "A"); - msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process + msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process queue->process(msg1); sleep(2); diff --git a/cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h b/cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h new file mode 100644 index 0000000000..b777234616 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file AtomicCounter.h + */ + +#ifndef tests_storePerftools_asyncPerf_AtomicCounter_h_ +#define tests_storePerftools_asyncPerf_AtomicCounter_h_ + +#include "qpid/sys/Condition.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +template <class T> +class AtomicCounter +{ +public: + AtomicCounter(const T& initValue = T(0)) : + m_cnt(initValue), + m_cntMutex(), + m_cntCondition() + {} + + virtual ~AtomicCounter() + {} + + T& + get() const + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + return m_cnt; + } + + void + operator++() + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + ++m_cnt; + } + + void + operator--() + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + if (--m_cnt == 0) { + m_cntCondition.notify(); + } + } + + void + waitForZero(const qpid::sys::Duration& d) + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex); + while (m_cnt != 0) { + m_cntCondition.wait(m_cntMutex, qpid::sys::AbsTime(qpid::sys::AbsTime(), d)); + } + } + +protected: + T m_cnt; + mutable qpid::sys::Mutex m_cntMutex; + qpid::sys::Condition m_cntCondition; +}; + +typedef AtomicCounter<uint32_t> AsyncOpCounter; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_AtomicCounter_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp index d88b6570a1..ad67bdd32f 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp @@ -23,50 +23,52 @@ #include "MessageAsyncContext.h" +#include <cassert> + namespace tests { namespace storePerftools { namespace asyncPerf { -MessageContext::MessageContext(MockPersistableMessage::shared_ptr msg, - const qpid::asyncStore::AsyncOperation::opCode op, - MockPersistableQueue* q) : +MessageAsyncContext::MessageAsyncContext(boost::shared_ptr<MockPersistableMessage> msg, + const qpid::asyncStore::AsyncOperation::opCode op, + boost::shared_ptr<MockPersistableQueue> q) : m_msg(msg), m_op(op), m_q(q) { assert(m_msg.get() != 0); - assert(m_q != 0); + assert(m_q.get() != 0); } -MessageContext::~MessageContext() +MessageAsyncContext::~MessageAsyncContext() {} qpid::asyncStore::AsyncOperation::opCode -MessageContext::getOpCode() const +MessageAsyncContext::getOpCode() const { return m_op; } const char* -MessageContext::getOpStr() const +MessageAsyncContext::getOpStr() const { return qpid::asyncStore::AsyncOperation::getOpStr(m_op); } -MockPersistableMessage::shared_ptr -MessageContext::getMessage() const +boost::shared_ptr<MockPersistableMessage> +MessageAsyncContext::getMessage() const { return m_msg; } -MockPersistableQueue* -MessageContext::getQueue() const +boost::shared_ptr<MockPersistableQueue> +MessageAsyncContext::getQueue() const { return m_q; } void -MessageContext::destroy() +MessageAsyncContext::destroy() { delete this; } diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h index 3a47b4dbe8..11da0d80bd 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h @@ -24,31 +24,35 @@ #ifndef tests_storePerfTools_asyncPerf_MessageContext_h_ #define tests_storePerfTools_asyncPerf_MessageContext_h_ -#include "MockPersistableMessage.h" - #include "qpid/asyncStore/AsyncOperation.h" -#include "qpid/broker/BrokerAsyncContext.h" +#include "qpid/broker/AsyncStore.h" // qpid::broker::BrokerAsyncContext + +#include <boost/shared_ptr.hpp> namespace tests { namespace storePerftools { namespace asyncPerf { -class MessageContext : public qpid::broker::BrokerAsyncContext +class MockPersistableMessage; +class MockPersistableQueue; + +class MessageAsyncContext : public qpid::broker::BrokerAsyncContext { public: - MessageContext(MockPersistableMessage::shared_ptr msg, - const qpid::asyncStore::AsyncOperation::opCode op, - MockPersistableQueue* q); - virtual ~MessageContext(); + MessageAsyncContext(boost::shared_ptr<MockPersistableMessage> msg, + const qpid::asyncStore::AsyncOperation::opCode op, + boost::shared_ptr<MockPersistableQueue> q); + virtual ~MessageAsyncContext(); qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; - MockPersistableMessage::shared_ptr getMessage() const; - MockPersistableQueue* getQueue() const; + boost::shared_ptr<MockPersistableMessage> getMessage() const; + boost::shared_ptr<MockPersistableQueue> getQueue() const; void destroy(); + protected: - MockPersistableMessage::shared_ptr m_msg; + boost::shared_ptr<MockPersistableMessage> m_msg; const qpid::asyncStore::AsyncOperation::opCode m_op; - MockPersistableQueue* m_q; + boost::shared_ptr<MockPersistableQueue> m_q; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp new file mode 100644 index 0000000000..6042291a0a --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MessageConsumer.cpp + */ + +#include "MessageConsumer.h" + +#include "MockPersistableQueue.h" +#include "TestOptions.h" + +#include <stdint.h> // uint32_t + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class MockTransactionContext; + +MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, + boost::shared_ptr<MockPersistableQueue> queue) : + m_perfTestParams(perfTestParams), + m_queue(queue) +{} + +MessageConsumer::~MessageConsumer() +{} + +void* +MessageConsumer::runConsumers() +{ + uint32_t numMsgs = 0; + while (numMsgs < m_perfTestParams.m_numMsgs) { + if (m_queue->dispatch()) { + ++numMsgs; + } else { + ::usleep(1000); // TODO - replace this poller with condition variable + } + } + return 0; +} + +//static +void* +MessageConsumer::startConsumers(void* ptr) +{ + return reinterpret_cast<MessageConsumer*>(ptr)->runConsumers(); +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h new file mode 100644 index 0000000000..30305fbe1a --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MessageConsumer.h + */ + +#ifndef tests_storePerftools_asyncPerf_MessageConsumer_h_ +#define tests_storePerftools_asyncPerf_MessageConsumer_h_ + +#include "boost/shared_ptr.hpp" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class MockPersistableQueue; +class TestOptions; + +class MessageConsumer +{ +public: + MessageConsumer(const TestOptions& perfTestParams, + boost::shared_ptr<MockPersistableQueue> queue); + virtual ~MessageConsumer(); + + void* runConsumers(); + static void* startConsumers(void* ptr); +protected: + const TestOptions& m_perfTestParams; + boost::shared_ptr<MockPersistableQueue> m_queue; +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_MessageConsumer_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp new file mode 100644 index 0000000000..c61ce352a1 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MessageDeque.cpp + */ + +#include "MessageDeque.h" +#include "QueuedMessage.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +MessageDeque::MessageDeque() +{} + +MessageDeque::~MessageDeque() +{} + +uint32_t +MessageDeque::size() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); + return m_messages.size(); +} + +bool +MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*removed*/) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); + m_messages.push_back(added); + return false; +} + +bool +MessageDeque::consume(QueuedMessage& msg) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); + if (!m_messages.empty()) { + msg = m_messages.front(); + m_messages.pop_front(); + return true; + } + return false; +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h new file mode 100644 index 0000000000..93ca099923 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MessageDeque.h + */ + +/* + * This is a copy of qpid::broker::MessageDeque.h, but using the local + * tests::storePerftools::asyncPerf::QueuedMessage class instead of + * qpid::broker::QueuedMessage. + */ + +#ifndef tests_storePerftools_asyncPerf_MessageDeque_h_ +#define tests_storePerftools_asyncPerf_MessageDeque_h_ + +#include "Messages.h" + +#include "qpid/sys/Mutex.h" + +#include <deque> + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class MessageDeque : public Messages +{ +public: + MessageDeque(); + virtual ~MessageDeque(); + uint32_t size(); + bool push(const QueuedMessage& added, QueuedMessage& removed); + bool consume(QueuedMessage& msg); +protected: + std::deque<QueuedMessage> m_messages; + qpid::sys::Mutex m_msgMutex; + +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_MessageDeque_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp new file mode 100644 index 0000000000..8540ff2b61 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MessageProducer.cpp + */ + +#include "MessageProducer.h" + +#include "MockPersistableMessage.h" +#include "MockPersistableQueue.h" +#include "TestOptions.h" + +#include <stdint.h> // uint32_t + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class MockTransactionContext; + +MessageProducer::MessageProducer(const TestOptions& perfTestParams, + const char* msgData, + qpid::asyncStore::AsyncStoreImpl* store, + boost::shared_ptr<MockPersistableQueue> queue) : + m_perfTestParams(perfTestParams), + m_msgData(msgData), + m_store(store), + m_queue(queue) +{} + +MessageProducer::~MessageProducer() +{} + +void* +MessageProducer::runProducers() +{ + boost::shared_ptr<MockTransactionContext> txn; + for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) { + boost::shared_ptr<MockPersistableMessage> msg(new MockPersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); + m_queue->deliver(msg); + } + return 0; +} + +//static +void* +MessageProducer::startProducers(void* ptr) +{ + return reinterpret_cast<MessageProducer*>(ptr)->runProducers(); +} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h new file mode 100644 index 0000000000..1b1f9b63fd --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file MessageProducer.h + */ + +#ifndef tests_storePerftools_asyncPerf_MessageProducer_h_ +#define tests_storePerftools_asyncPerf_MessageProducer_h_ + +#include "boost/shared_ptr.hpp" + +namespace qpid { +namespace asyncStore { + +class AsyncStoreImpl; + +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class MockPersistableQueue; +class TestOptions; + +class MessageProducer +{ +public: + MessageProducer(const TestOptions& perfTestParams, + const char* msgData, + qpid::asyncStore::AsyncStoreImpl* store, + boost::shared_ptr<MockPersistableQueue> queue); + virtual ~MessageProducer(); + void* runProducers(); + static void* startProducers(void* ptr); +protected: + const TestOptions& m_perfTestParams; + const char* m_msgData; + qpid::asyncStore::AsyncStoreImpl* m_store; + boost::shared_ptr<MockPersistableQueue> m_queue; +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_MessageProducer_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/Messages.h b/cpp/src/tests/storePerftools/asyncPerf/Messages.h new file mode 100644 index 0000000000..9b5bd0be99 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/Messages.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file Messages.h + */ + +/* + * This is a copy of qpid::broker::Messages.h, but using the local + * tests::storePerftools::asyncPerf::QueuedMessage class instead of + * qpid::broker::QueuedMessage. + */ + +#ifndef tests_storePerftools_asyncPerf_Messages_h_ +#define tests_storePerftools_asyncPerf_Messages_h_ + +#include <stdint.h> + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class QueuedMessage; + +class Messages +{ +public: + virtual ~Messages() {} + virtual uint32_t size() = 0; + virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0; + virtual bool consume(QueuedMessage& msg) = 0; +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_Messages_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp index e89a98d02d..e7cab4d621 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp @@ -23,9 +23,6 @@ #include "MockPersistableMessage.h" -#include "MessageAsyncContext.h" -#include "MockPersistableQueue.h" // debug statements in enqueueComplete() and dequeueComplete() - #include "qpid/asyncStore/AsyncStoreImpl.h" namespace tests { @@ -34,46 +31,19 @@ namespace asyncPerf { MockPersistableMessage::MockPersistableMessage(const char* msgData, const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store, - const bool persistent) : + qpid::asyncStore::AsyncStoreImpl* store) : m_persistenceId(0ULL), m_msg(msgData, static_cast<size_t>(msgSize)), - m_persistent(persistent), - m_msgHandle(store->createMessageHandle(this)) + m_msgHandle(store ? store->createMessageHandle(this) : store->createMessageHandle(0)) {} MockPersistableMessage::~MockPersistableMessage() {} -// static -void -MockPersistableMessage::handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerAsyncContext* bc) +const qpid::broker::MessageHandle& +MockPersistableMessage::getHandle() const { - if (bc) { - MessageContext* mc = dynamic_cast<MessageContext*>(bc); - if (res->errNo) { - // TODO: Handle async failure here - std::cerr << "Message pid=0x" << std::hex << mc->getMessage()->m_persistenceId << std::dec << ": Operation " - << mc->getOpStr() << ": failure " << res->errNo << " (" << res->errMsg << ")" << std::endl; - } else { - // Handle async success here - switch(mc->getOpCode()) { - case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: - mc->getMessage()->dequeueComplete(mc); - break; - case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: - mc->getMessage()->enqueueComplete(mc); - break; - default: - std::ostringstream oss; - oss << "tests::storePerftools::asyncPerf::MockPersistableMessage::handleAsyncResult(): Unknown async queue operation: " << mc->getOpCode(); - throw qpid::Exception(oss.str()); - }; - } - } - if (bc) delete bc; - if (res) delete res; + return m_msgHandle; } qpid::broker::MessageHandle& @@ -119,7 +89,7 @@ MockPersistableMessage::encodedHeaderSize() const bool MockPersistableMessage::isPersistent() const { - return m_persistent; + return m_msgHandle.isValid(); } uint64_t @@ -134,20 +104,4 @@ MockPersistableMessage::write(char* target) ::memcpy(target, m_msg.data(), m_msg.size()); } -// protected -void -MockPersistableMessage::enqueueComplete(const MessageContext* mc) -{ -//std::cout << "~~~~~ Message pid=0x" << std::hex << mc->m_msg->getPersistenceId() << std::dec << ": enqueueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl << std::flush; - assert(mc->getMessage().get() == this); -} - -// protected -void -MockPersistableMessage::dequeueComplete(const MessageContext* mc) -{ -//std::cout << "~~~~~ Message pid=0x" << std::hex << mc->m_msg->getPersistenceId() << std::dec << ": dequeueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl << std::flush; - assert(mc->getMessage().get() == this); -} - }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h index c98bb8e843..fc1c3ee47a 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h @@ -28,6 +28,8 @@ #include "qpid/broker/MessageHandle.h" #include "qpid/broker/PersistableMessage.h" +#include <set> + namespace qpid { namespace asyncStore { class AsyncStoreImpl; @@ -37,21 +39,17 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class MessageContext; class MockPersistableQueue; -class MockPersistableMessage: public qpid::broker::PersistableMessage, public qpid::broker::DataSource +class MockPersistableMessage: public qpid::broker::PersistableMessage, + public qpid::broker::DataSource { public: - typedef boost::shared_ptr<MockPersistableMessage> shared_ptr; - MockPersistableMessage(const char* msgData, const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store, - const bool persistent = true); + qpid::asyncStore::AsyncStoreImpl* store); virtual ~MockPersistableMessage(); - static void handleAsyncResult(const qpid::broker::AsyncResult* res, - qpid::broker::BrokerAsyncContext* bc); + const qpid::broker::MessageHandle& getHandle() const; qpid::broker::MessageHandle& getHandle(); // Interface Persistable @@ -72,13 +70,7 @@ public: protected: mutable uint64_t m_persistenceId; const std::string m_msg; - const bool m_persistent; qpid::broker::MessageHandle m_msgHandle; - - // --- Ascnc op completions (called through handleAsyncResult) --- - void enqueueComplete(const MessageContext* mc); - void dequeueComplete(const MessageContext* mc); - }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp index ede0830045..009f54a157 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp @@ -23,16 +23,13 @@ #include "MockPersistableQueue.h" -#include "MessageAsyncContext.h" +#include "MessageDeque.h" #include "MockPersistableMessage.h" #include "MockTransactionContext.h" #include "QueueAsyncContext.h" #include "QueuedMessage.h" -#include "TestOptions.h" #include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/BrokerAsyncContext.h" -#include "qpid/broker/EnqueueHandle.h" namespace tests { namespace storePerftools { @@ -40,19 +37,22 @@ namespace asyncPerf { MockPersistableQueue::MockPersistableQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, - qpid::asyncStore::AsyncStoreImpl* store, - const TestOptions& to, - const char* msgData) : + qpid::asyncStore::AsyncStoreImpl* store) : qpid::broker::PersistableQueue(), m_name(name), m_store(store), + m_asyncOpCounter(0UL), m_persistenceId(0ULL), m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. - m_perfTestOpts(to), - m_msgData(msgData) + m_destroyPending(false), + m_destroyed(false), + m_barrier(*this), + m_messages(new MessageDeque()) { - const qpid::types::Variant::Map qo; - m_queueHandle = m_store->createQueueHandle(m_name, qo); + if (m_store != 0) { + const qpid::types::Variant::Map qo; + m_queueHandle = m_store->createQueueHandle(m_name, qo); + } } MockPersistableQueue::~MockPersistableQueue() @@ -71,7 +71,7 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, if (bc && res) { QueueAsyncContext* qc = dynamic_cast<QueueAsyncContext*>(bc); if (res->errNo) { - // TODO: Handle async failure here + // TODO: Handle async failure here (other than by simply printing a message) std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " << res->errNo << " (" << res->errMsg << ")" << std::endl; } else { @@ -86,6 +86,12 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY: qc->getQueue()->destroyComplete(qc); break; + case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE: + qc->getQueue()->enqueueComplete(qc); + break; + case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE: + qc->getQueue()->dequeueComplete(qc); + break; default: std::ostringstream oss; oss << "tests::storePerftools::asyncPerf::MockPersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode(); @@ -97,127 +103,100 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res, if (res) delete res; } +const qpid::broker::QueueHandle& +MockPersistableQueue::getHandle() const +{ + return m_queueHandle; +} + qpid::broker::QueueHandle& MockPersistableQueue::getHandle() { return m_queueHandle; } -void -MockPersistableQueue::asyncStoreCreate() +qpid::asyncStore::AsyncStoreImpl* +MockPersistableQueue::getStore() { - m_store->submitCreate(m_queueHandle, - this, - &handleAsyncResult, - new QueueAsyncContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE)); + return m_store; } void -MockPersistableQueue::asyncStoreDestroy() +MockPersistableQueue::asyncCreate() { - m_store->submitDestroy(m_queueHandle, - &handleAsyncResult, - new QueueAsyncContext(this, qpid::asyncStore::AsyncOperation::QUEUE_DESTROY)); + if (m_store) { + m_store->submitCreate(m_queueHandle, + this, + &handleAsyncResult, + new QueueAsyncContext(shared_from_this(), + qpid::asyncStore::AsyncOperation::QUEUE_CREATE)); + ++m_asyncOpCounter; + } } -void* -MockPersistableQueue::runEnqueues() +void +MockPersistableQueue::asyncDestroy(const bool deleteQueue) { - uint32_t numMsgs = 0; - uint16_t txnCnt = 0; - const bool useTxn = m_perfTestOpts.m_enqTxnBlockSize > 0; - MockTransactionContextPtr txn; - while (numMsgs < m_perfTestOpts.m_numMsgs) { - if (useTxn && txnCnt == 0) { - txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() + m_destroyPending = true; + if (m_store) { + if (deleteQueue) { + m_store->submitDestroy(m_queueHandle, + &handleAsyncResult, + new QueueAsyncContext(shared_from_this(), + qpid::asyncStore::AsyncOperation::QUEUE_DESTROY)); + ++m_asyncOpCounter; } - MockPersistableMessage::shared_ptr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true)); - msg->setPersistenceId(m_store->getNextRid()); - qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle); - MessageContext* msgCtxt = new MessageContext(msg, - qpid::asyncStore::AsyncOperation::MSG_ENQUEUE, - this); - if (useTxn) { - m_store->submitEnqueue(enqHandle, - txn->getHandle(), - &MockPersistableMessage::handleAsyncResult, - dynamic_cast<qpid::broker::BrokerAsyncContext*>(msgCtxt)); - } else { - m_store->submitEnqueue(enqHandle, - &MockPersistableMessage::handleAsyncResult, - dynamic_cast<qpid::broker::BrokerAsyncContext*>(msgCtxt)); - } - QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn)); - push(qm); - if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) { - txn->commit(); - txnCnt = 0; - } - ++numMsgs; - } - if (txnCnt > 0) { - txn->commit(); - txnCnt = 0; + m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); } - return 0; } -void* -MockPersistableQueue::runDequeues() +void +MockPersistableQueue::deliver(boost::shared_ptr<MockPersistableMessage> msg) { - uint32_t numMsgs = 0; - const uint32_t numMsgsToDequeue = m_perfTestOpts.m_numMsgs * m_perfTestOpts.m_numEnqThreadsPerQueue / m_perfTestOpts.m_numDeqThreadsPerQueue; - uint16_t txnCnt = 0; - const bool useTxn = m_perfTestOpts.m_deqTxnBlockSize > 0; - MockTransactionContextPtr txn; - QueuedMessagePtr qm; - while (numMsgs < numMsgsToDequeue) { - if (useTxn && txnCnt == 0) { - txn.reset(new MockTransactionContext(m_store)); // equivalent to begin() - } - pop(qm); - if (qm.get()) { - qpid::broker::EnqueueHandle enqHandle = qm->getEnqueueHandle(); - qpid::broker::BrokerAsyncContext* bc = new MessageContext(qm->getMessage(), - qpid::asyncStore::AsyncOperation::MSG_DEQUEUE, - this); - if (useTxn) { - m_store->submitDequeue(enqHandle, - txn->getHandle(), - &MockPersistableMessage::handleAsyncResult, - bc); - } else { - m_store->submitDequeue(enqHandle, - &MockPersistableMessage::handleAsyncResult, - bc); - } - ++numMsgs; - qm.reset(static_cast<QueuedMessage*>(0)); - if (useTxn && ++txnCnt >= m_perfTestOpts.m_deqTxnBlockSize) { - txn->commit(); - txnCnt = 0; - } - } + QueuedMessage qm(this, msg); + if(enqueue((MockTransactionContext*)0, qm)) { + push(qm); } - if (txnCnt > 0) { - txn->commit(); - txnCnt = 0; +} + +bool +MockPersistableQueue::dispatch() +{ + QueuedMessage qm; + if (m_messages->consume(qm)) { + return dequeue((MockTransactionContext*)0, qm); } - return 0; + return false; } -//static -void* -MockPersistableQueue::startEnqueues(void* ptr) +bool +MockPersistableQueue::enqueue(MockTransactionContext* ctxt, + QueuedMessage& qm) { - return reinterpret_cast<MockPersistableQueue*>(ptr)->runEnqueues(); + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm.payload()->isPersistent() && m_store) { + qm.payload()->enqueueAsync(shared_from_this(), m_store); + return asyncEnqueue(ctxt, qm); + } + return false; } -//static -void* -MockPersistableQueue::startDequeues(void* ptr) +bool +MockPersistableQueue::dequeue(MockTransactionContext* ctxt, + QueuedMessage& qm) { - return reinterpret_cast<MockPersistableQueue*>(ptr)->runDequeues(); + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm.payload()->isPersistent() && m_store) { + qm.payload()->dequeueAsync(shared_from_this(), m_store); + return asyncDequeue(ctxt, qm); + } + return false; } void @@ -276,61 +255,158 @@ MockPersistableQueue::write(char* target) ::memcpy(target, m_persistableData.data(), m_persistableData.size()); } +// --- Members & methods in msg handling path from qpid::Queue --- + +// protected +MockPersistableQueue::UsageBarrier::UsageBarrier(MockPersistableQueue& q) : + m_parent(q), + m_count(0) +{} + +// protected +bool +MockPersistableQueue::UsageBarrier::acquire() +{ + qpid::sys::Monitor::ScopedLock l(m_monitor); + if (m_parent.m_destroyed) { + return false; + } else { + ++m_count; + return true; + } +} + +// protected +void MockPersistableQueue::UsageBarrier::release() +{ + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + if (--m_count == 0) { + m_monitor.notifyAll(); + } +} + +// protected +void MockPersistableQueue::UsageBarrier::destroy() +{ + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + m_parent.m_destroyed = true; + while (m_count) { + m_monitor.wait(); + } +} + +// protected +MockPersistableQueue::ScopedUse::ScopedUse(UsageBarrier& b) : + m_barrier(b), + m_acquired(m_barrier.acquire()) +{} + +// protected +MockPersistableQueue::ScopedUse::~ScopedUse() +{ + if (m_acquired) { + m_barrier.release(); + } +} + +// protected +void +MockPersistableQueue::push(QueuedMessage& qm, + bool /*isRecovery*/) +{ + QueuedMessage removed; + m_messages->push(qm, removed); +} + +// --- End Members & methods in msg handling path from qpid::Queue --- + +// protected +bool +MockPersistableQueue::asyncEnqueue(MockTransactionContext* txn, + QueuedMessage& qm) +{ + qm.payload()->setPersistenceId(m_store->getNextRid()); +//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; + m_store->submitEnqueue(/*enqHandle*/qm.enqHandle(), + txn->getHandle(), + &handleAsyncResult, + new QueueAsyncContext(shared_from_this(), + qm.payload(), + qpid::asyncStore::AsyncOperation::MSG_ENQUEUE)); + ++m_asyncOpCounter; + return true; +} + +// protected +bool +MockPersistableQueue::asyncDequeue(MockTransactionContext* txn, + QueuedMessage& qm) +{ +//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; + qpid::broker::EnqueueHandle enqHandle = qm.enqHandle(); + m_store->submitDequeue(enqHandle, + txn->getHandle(), + &handleAsyncResult, + new QueueAsyncContext(shared_from_this(), + qm.payload(), + qpid::asyncStore::AsyncOperation::MSG_DEQUEUE)); + ++m_asyncOpCounter; + return true; +} + +// protected +void +MockPersistableQueue::destroyCheck(const std::string& opDescr) const +{ + if (m_destroyPending || m_destroyed) { + std::ostringstream oss; + oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; + throw qpid::Exception(oss.str()); + } +} + // protected void MockPersistableQueue::createComplete(const QueueAsyncContext* qc) { -//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": createComplete()" << std::endl << std::flush; +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": createComplete()" << std::endl << std::flush; assert(qc->getQueue().get() == this); + --m_asyncOpCounter; } // protected void MockPersistableQueue::flushComplete(const QueueAsyncContext* qc) { -//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": flushComplete()" << std::endl << std::flush; +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": flushComplete()" << std::endl << std::flush; assert(qc->getQueue().get() == this); + --m_asyncOpCounter; } // protected void MockPersistableQueue::destroyComplete(const QueueAsyncContext* qc) { -//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": destroyComplete()" << std::endl << std::flush; +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": destroyComplete()" << std::endl << std::flush; assert(qc->getQueue().get() == this); + --m_asyncOpCounter; + m_destroyed = true; } -// protected void -MockPersistableQueue::push(QueuedMessagePtr& qm) +MockPersistableQueue::enqueueComplete(const QueueAsyncContext* qc) { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); - m_enqueuedMsgs.push_back(qm); - m_dequeueCondition.notify(); +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; } -// protected void -MockPersistableQueue::pop(QueuedMessagePtr& qm) +MockPersistableQueue::dequeueComplete(const QueueAsyncContext* qc) { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); - while (m_enqueuedMsgs.empty()) { - m_dequeueCondition.wait(m_enqueuedMsgsMutex); - } - qm = m_enqueuedMsgs.front(); - if (qm->isTransactional()) { - // The next msg is still in an open transaction, skip and find next non-open-txn msg - MsgEnqListItr i = m_enqueuedMsgs.begin(); - while (++i != m_enqueuedMsgs.end()) { - if (!(*i)->isTransactional()) { - qm = *i; - m_enqueuedMsgs.erase(i); - } - } - } else { - // The next msg is not in an open txn - m_enqueuedMsgs.pop_front(); - } +//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; + assert(qc->getQueue().get() == this); + --m_asyncOpCounter; } }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h index 2d7b1f1c4e..ff6db93542 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h @@ -24,19 +24,21 @@ #ifndef tests_storePerftools_asyncPerf_MockPersistableQueue_h_ #define tests_storePerftools_asyncPerf_MockPersistableQueue_h_ +#include "AtomicCounter.h" // AsyncOpCounter + #include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource #include "qpid/broker/PersistableQueue.h" #include "qpid/broker/QueueHandle.h" -#include "qpid/sys/Condition.h" -#include "qpid/sys/Mutex.h" +#include "qpid/sys/Monitor.h" -#include <boost/intrusive_ptr.hpp> -#include <deque> +#include <boost/shared_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> namespace qpid { namespace asyncStore { class AsyncStoreImpl; } + namespace framing { class FieldTable; }} @@ -45,36 +47,39 @@ namespace tests { namespace storePerftools { namespace asyncPerf { +class Messages; +class MockPersistableMessage; +class MockPersistableQueue; +class MockTransactionContext; class QueueAsyncContext; class QueuedMessage; -class TestOptions; - -typedef boost::shared_ptr<QueuedMessage> QueuedMessagePtr; -class MockPersistableQueue : public qpid::broker::PersistableQueue, public qpid::broker::DataSource +class MockPersistableQueue : public boost::enable_shared_from_this<MockPersistableQueue>, + public qpid::broker::PersistableQueue, + public qpid::broker::DataSource { public: - typedef boost::intrusive_ptr<MockPersistableQueue> intrusive_ptr; - MockPersistableQueue(const std::string& name, const qpid::framing::FieldTable& args, - qpid::asyncStore::AsyncStoreImpl* store, - const TestOptions& perfTestParams, - const char* msgData); + qpid::asyncStore::AsyncStoreImpl* store); virtual ~MockPersistableQueue(); - // --- Async functionality --- static void handleAsyncResult(const qpid::broker::AsyncResult* res, qpid::broker::BrokerAsyncContext* bc); + const qpid::broker::QueueHandle& getHandle() const; qpid::broker::QueueHandle& getHandle(); - void asyncStoreCreate(); - void asyncStoreDestroy(); + qpid::asyncStore::AsyncStoreImpl* getStore(); + + void asyncCreate(); + void asyncDestroy(const bool deleteQueue); - // --- Performance test thread entry points --- - void* runEnqueues(); - void* runDequeues(); - static void* startEnqueues(void* ptr); - static void* startDequeues(void* ptr); + // --- Methods in msg handling path from qpid::Queue --- + void deliver(boost::shared_ptr<MockPersistableMessage> msg); + bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&) but without Consumer param + bool enqueue(MockTransactionContext* ctxt, + QueuedMessage& qm); + bool dequeue(MockTransactionContext* ctxt, + QueuedMessage& qm); // --- Interface qpid::broker::Persistable --- virtual void encode(qpid::framing::Buffer& buffer) const; @@ -94,28 +99,51 @@ public: protected: const std::string m_name; qpid::asyncStore::AsyncStoreImpl* m_store; + AsyncOpCounter m_asyncOpCounter; mutable uint64_t m_persistenceId; std::string m_persistableData; qpid::broker::QueueHandle m_queueHandle; - - // Test params - const TestOptions& m_perfTestOpts; - const char* m_msgData; - - typedef std::deque<QueuedMessagePtr> MsgEnqList; - typedef MsgEnqList::iterator MsgEnqListItr; - MsgEnqList m_enqueuedMsgs; - qpid::sys::Mutex m_enqueuedMsgsMutex; - qpid::sys::Condition m_dequeueCondition; - - // --- Ascnc op completions (called through handleAsyncResult) --- + bool m_destroyPending; + bool m_destroyed; + + // --- Members & methods in msg handling path copied from qpid::Queue --- + struct UsageBarrier + { + MockPersistableQueue& m_parent; + uint32_t m_count; + qpid::sys::Monitor m_monitor; + UsageBarrier(MockPersistableQueue& q); + bool acquire(); + void release(); + void destroy(); + }; + struct ScopedUse + { + UsageBarrier& m_barrier; + const bool m_acquired; + ScopedUse(UsageBarrier& b); + ~ScopedUse(); + }; + UsageBarrier m_barrier; + std::auto_ptr<Messages> m_messages; + void push(QueuedMessage& qm, + bool isRecovery = false); + + // -- Async ops --- + bool asyncEnqueue(MockTransactionContext* txn, + QueuedMessage& qm); + bool asyncDequeue(MockTransactionContext* txn, + QueuedMessage& qm); + + // --- Async op counter --- + void destroyCheck(const std::string& opDescr) const; + + // --- Async op completions (called through handleAsyncResult) --- void createComplete(const QueueAsyncContext* qc); void flushComplete(const QueueAsyncContext* qc); void destroyComplete(const QueueAsyncContext* qc); - - // --- Queue functionality --- - void push(QueuedMessagePtr& msg); - void pop(QueuedMessagePtr& msg); + void enqueueComplete(const QueueAsyncContext* qc); + void dequeueComplete(const QueueAsyncContext* qc); }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp index 05f06e95a1..c444f596e5 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp @@ -23,24 +23,41 @@ #include "MockTransactionContext.h" -#include "QueuedMessage.h" #include "TransactionAsyncContext.h" #include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/BrokerAsyncContext.h" + +#include <uuid/uuid.h> namespace tests { namespace storePerftools { namespace asyncPerf { +MockTransactionContext::MockTransactionContext(const std::string& xid) : + qpid::broker::TransactionContext(), + m_xid(xid), + m_tpcFlag(!xid.empty()), + m_store(0), + m_txnHandle(0), + m_prepared(false), + m_enqueuedMsgs() +{ + if (!m_tpcFlag) { + setLocalXid(); + } +//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; +} + MockTransactionContext::MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, const std::string& xid) : m_store(store), - m_txnHandle(store->createTxnHandle(xid)), m_prepared(false), m_enqueuedMsgs() { //std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; + if (m_store != 0) { + m_txnHandle = store->createTxnHandle(xid); + } } MockTransactionContext::~MockTransactionContext() @@ -80,6 +97,12 @@ MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, if (res) delete res; } +const qpid::broker::TxnHandle& +MockTransactionContext::getHandle() const +{ + return m_txnHandle; +} + qpid::broker::TxnHandle& MockTransactionContext::getHandle() { @@ -89,13 +112,13 @@ MockTransactionContext::getHandle() bool MockTransactionContext::is2pc() const { - return m_txnHandle.is2pc(); + return m_tpcFlag; } const std::string& MockTransactionContext::getXid() const { - return m_txnHandle.getXid(); + return m_xid; } void @@ -108,7 +131,7 @@ MockTransactionContext::addEnqueuedMsg(QueuedMessage* qm) void MockTransactionContext::prepare() { - if (m_txnHandle.is2pc()) { + if (m_tpcFlag) { localPrepare(); m_prepared = true; } @@ -126,9 +149,11 @@ MockTransactionContext::abort() if (!m_prepared) { localPrepare(); } - m_store->submitAbort(m_txnHandle, - &handleAsyncResult, - dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT))); + if (m_store != 0) { +// m_store->submitAbort(m_txnHandle, +// &handleAsyncResult, +// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT))); + } //std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; } @@ -145,9 +170,11 @@ MockTransactionContext::commit() } else { localPrepare(); } - m_store->submitCommit(m_txnHandle, - &handleAsyncResult, - dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT))); + if (m_store != 0) { +// m_store->submitCommit(m_txnHandle, +// &handleAsyncResult, +// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT))); + } //std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; } @@ -156,23 +183,38 @@ MockTransactionContext::commit() void MockTransactionContext::localPrepare() { - m_store->submitPrepare(m_txnHandle, - &handleAsyncResult, - dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE))); + if (m_store != 0) { +// m_store->submitPrepare(m_txnHandle, +// &handleAsyncResult, +// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE))); + } //std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; } // protected void -MockTransactionContext::prepareComplete(const TransactionAsyncContext* tc) +MockTransactionContext::setLocalXid() +{ + uuid_t uuid; + // TODO: Valgrind warning: Possible race condition in uuid_generate_random() - is it thread-safe, and if not, does it matter? + // If this race condition affects the randomness of the UUID, then there could be a problem here. + ::uuid_generate_random(uuid); + char uuidStr[37]; // 36-char uuid + trailing '\0' + ::uuid_unparse(uuid, uuidStr); + m_xid.assign(uuidStr); +} + +// protected +void +MockTransactionContext::prepareComplete(const TransactionAsyncContext* /*tc*/) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); - while (!m_enqueuedMsgs.empty()) { - m_enqueuedMsgs.front()->clearTransaction(); - m_enqueuedMsgs.pop_front(); - } +// while (!m_enqueuedMsgs.empty()) { +// m_enqueuedMsgs.front()->clearTransaction(); +// m_enqueuedMsgs.pop_front(); +// } //std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush; - assert(tc->getTransactionContext() == this); +// assert(tc->getTransactionContext().get() == this); } @@ -181,7 +223,7 @@ void MockTransactionContext::abortComplete(const TransactionAsyncContext* tc) { //std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush; - assert(tc->getTransactionContext() == this); + assert(tc->getTransactionContext().get() == this); } @@ -190,7 +232,7 @@ void MockTransactionContext::commitComplete(const TransactionAsyncContext* tc) { //std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush; - assert(tc->getTransactionContext() == this); + assert(tc->getTransactionContext().get() == this); } }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h index 2f8dd716f4..3f70b0bfda 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h @@ -24,17 +24,19 @@ #ifndef tests_storePerftools_asyncPerf_MockTransactionContext_h_ #define tests_storePerftools_asyncPerf_MockTransactionContext_h_ -#include "qpid/broker/AsyncStore.h" // qpid::broker::AsyncResult #include "qpid/broker/TransactionalStore.h" // qpid::broker::TransactionContext #include "qpid/broker/TxnHandle.h" #include "qpid/sys/Mutex.h" -#include <boost/shared_ptr.hpp> #include <deque> namespace qpid { namespace asyncStore { class AsyncStoreImpl; +} +namespace broker { +class AsyncResult; +class BrokerAsyncContext; }} namespace tests { @@ -47,14 +49,14 @@ class TransactionAsyncContext; class MockTransactionContext : public qpid::broker::TransactionContext { public: - typedef boost::shared_ptr<MockTransactionContext> shared_ptr; - + MockTransactionContext(const std::string& xid = std::string()); MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, const std::string& xid = std::string()); virtual ~MockTransactionContext(); static void handleAsyncResult(const qpid::broker::AsyncResult* res, qpid::broker::BrokerAsyncContext* bc); + const qpid::broker::TxnHandle& getHandle() const; qpid::broker::TxnHandle& getHandle(); bool is2pc() const; const std::string& getXid() const; @@ -65,6 +67,8 @@ public: void commit(); protected: + std::string m_xid; + bool m_tpcFlag; qpid::asyncStore::AsyncStoreImpl* m_store; qpid::broker::TxnHandle m_txnHandle; bool m_prepared; @@ -72,6 +76,7 @@ protected: qpid::sys::Mutex m_enqueuedMsgsMutex; void localPrepare(); + void setLocalXid(); // --- Ascnc op completions (called through handleAsyncResult) --- void prepareComplete(const TransactionAsyncContext* tc); diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index 7387c348fd..184a899570 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -23,6 +23,8 @@ #include "PerfTest.h" +#include "MessageConsumer.h" +#include "MessageProducer.h" #include "MockPersistableQueue.h" #include "tests/storePerftools/version.h" @@ -30,6 +32,7 @@ #include "tests/storePerftools/common/Thread.h" #include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/sys/Poller.h" #include <iomanip> @@ -56,8 +59,9 @@ PerfTest::~PerfTest() m_pollingThread.join(); m_queueList.clear(); + m_queueList.clear(); + m_producers.clear(); - if (m_store) delete m_store; delete[] m_msgData; } @@ -69,13 +73,21 @@ PerfTest::prepareStore() } void +PerfTest::destroyStore() +{ + if (m_store) { + delete m_store; + } +} + +void PerfTest::prepareQueues() { for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { std::ostringstream qname; qname << "queue_" << std::setw(4) << std::setfill('0') << i; - MockPersistableQueue::intrusive_ptr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store, m_testOpts, m_msgData)); - mpq->asyncStoreCreate(); + boost::shared_ptr<MockPersistableQueue> mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store)); + mpq->asyncCreate(); m_queueList.push_back(mpq); } } @@ -83,32 +95,38 @@ PerfTest::prepareQueues() void PerfTest::destroyQueues() { - for (std::deque<MockPersistableQueue::intrusive_ptr>::iterator i=m_queueList.begin(); i!=m_queueList.end(); ++i) { - (*i)->asyncStoreDestroy(); + while (m_queueList.size() > 0) { + m_queueList.front()->asyncDestroy(m_testOpts.m_destroyQueuesOnCompletion); + m_queueList.pop_front(); } } void PerfTest::run() { - typedef boost::shared_ptr<tests::storePerftools::common::Thread> ThreadPtr; // TODO - replace with qpid threads - - prepareStore(); + if (m_testOpts.m_durable) { + prepareStore(); + } prepareQueues(); - std::deque<ThreadPtr> threads; + // TODO: replace with qpid::sys::Thread + std::deque<boost::shared_ptr<tests::storePerftools::common::Thread> > threads; { // --- Start of timed section --- tests::storePerftools::common::ScopedTimer st(m_testResult); for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) { + boost::shared_ptr<MessageProducer> mp(new MessageProducer(m_testOpts, m_msgData, m_store, m_queueList[q])); + m_producers.push_back(mp); for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads - ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startEnqueues, - reinterpret_cast<void*>(m_queueList[q].get()))); + boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mp->startProducers, + reinterpret_cast<void*>(mp.get()))); threads.push_back(tp); } + boost::shared_ptr<MessageConsumer> mc(new MessageConsumer(m_testOpts, m_queueList[q])); + m_consumers.push_back(mc); for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads - ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startDequeues, - reinterpret_cast<void*>(m_queueList[q].get()))); + boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mc->startConsumers, + reinterpret_cast<void*>(mc.get()))); threads.push_back(tp); } } @@ -117,8 +135,8 @@ PerfTest::run() threads.pop_front(); } } // --- End of timed section --- - // TODO: Add test param to allow queues to be destroyed or left when test ends destroyQueues(); + destroyStore(); } void @@ -172,6 +190,6 @@ main(int argc, char** argv) // Print test result std::cout << apt << std::endl; - ::sleep(1); + //::sleep(1); return 0; } diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h index 2b1e65f871..3bd3f6bd32 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h @@ -24,13 +24,11 @@ #ifndef tests_storePerftools_asyncPerf_PerfTest_h_ #define tests_storePerftools_asyncPerf_PerfTest_h_ -#include "MockPersistableQueue.h" #include "TestResult.h" #include "tests/storePerftools/common/Streamable.h" #include "qpid/framing/FieldTable.h" -#include "qpid/sys/Poller.h" #include "qpid/sys/Thread.h" #include <boost/shared_ptr.hpp> @@ -40,12 +38,18 @@ namespace qpid { namespace asyncStore { class AsyncStoreImpl; class AsyncStoreOptions; +} +namespace sys { +class Poller; }} namespace tests { namespace storePerftools { namespace asyncPerf { +class MockPersistableQueue; +class MessageConsumer; +class MessageProducer; class TestOptions; class PerfTest : public tests::storePerftools::common::Streamable @@ -66,9 +70,12 @@ protected: boost::shared_ptr<qpid::sys::Poller> m_poller; qpid::sys::Thread m_pollingThread; qpid::asyncStore::AsyncStoreImpl* m_store; - std::deque<MockPersistableQueue::intrusive_ptr> m_queueList; + std::deque<boost::shared_ptr<MockPersistableQueue> > m_queueList; + std::deque<boost::shared_ptr<MessageProducer> > m_producers; + std::deque<boost::shared_ptr<MessageConsumer> > m_consumers; void prepareStore(); + void destroyStore(); void prepareQueues(); void destroyQueues(); diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp index 281fc03e2c..be0c087390 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp @@ -23,17 +23,29 @@ #include "QueueAsyncContext.h" +#include <cassert> + namespace tests { namespace storePerftools { namespace asyncPerf { -QueueAsyncContext::QueueAsyncContext(MockPersistableQueue::intrusive_ptr q, - const qpid::asyncStore::AsyncOperation::opCode op) : - qpid::broker::BrokerAsyncContext(), +QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q, + const qpid::asyncStore::AsyncOperation::opCode op) : + m_q(q), + m_op(op) +{ + assert(m_q.get() != 0); +} + +QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q, + boost::shared_ptr<MockPersistableMessage> msg, + const qpid::asyncStore::AsyncOperation::opCode op) : m_q(q), + m_msg(msg), m_op(op) { assert(m_q.get() != 0); + assert(m_msg.get() != 0); } QueueAsyncContext::~QueueAsyncContext() @@ -51,12 +63,18 @@ QueueAsyncContext::getOpStr() const return qpid::asyncStore::AsyncOperation::getOpStr(m_op); } -MockPersistableQueue::intrusive_ptr +boost::shared_ptr<MockPersistableQueue> QueueAsyncContext::getQueue() const { return m_q; } +boost::shared_ptr<MockPersistableMessage> +QueueAsyncContext::getMessage() const +{ + return m_msg; +} + void QueueAsyncContext::destroy() { diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h index 657e80694a..2b6b3778cd 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h @@ -24,26 +24,36 @@ #ifndef tests_storePerftools_asyncPerf_QueueContext_h_ #define tests_storePerftools_asyncPerf_QueueContext_h_ -#include "MockPersistableQueue.h" #include "qpid/asyncStore/AsyncOperation.h" -#include "qpid/broker/BrokerAsyncContext.h" +#include "qpid/broker/AsyncStore.h" + +#include <boost/shared_ptr.hpp> namespace tests { namespace storePerftools { namespace asyncPerf { +class MockPersistableMessage; +class MockPersistableQueue; + class QueueAsyncContext: public qpid::broker::BrokerAsyncContext { public: - QueueAsyncContext(MockPersistableQueue::intrusive_ptr q, - const qpid::asyncStore::AsyncOperation::opCode op); + QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q, + const qpid::asyncStore::AsyncOperation::opCode op); + QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q, + boost::shared_ptr<MockPersistableMessage> msg, + const qpid::asyncStore::AsyncOperation::opCode op); virtual ~QueueAsyncContext(); qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; - MockPersistableQueue::intrusive_ptr getQueue() const; + boost::shared_ptr<MockPersistableQueue> getQueue() const; + boost::shared_ptr<MockPersistableMessage> getMessage() const; void destroy(); + protected: - MockPersistableQueue::intrusive_ptr m_q; + boost::shared_ptr<MockPersistableQueue> m_q; + boost::shared_ptr<MockPersistableMessage> m_msg; const qpid::asyncStore::AsyncOperation::opCode m_op; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp index 9e5e131a28..7903d6551a 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -23,20 +23,71 @@ #include "QueuedMessage.h" -#include "MockTransactionContext.h" +#include "MockPersistableMessage.h" +#include "MockPersistableQueue.h" + +#include "qpid/asyncStore/AsyncStoreImpl.h" namespace tests { namespace storePerftools { namespace asyncPerf { -QueuedMessage::QueuedMessage(MockPersistableMessage::shared_ptr msg, +QueuedMessage::QueuedMessage() : + m_queue(0) +{} + +QueuedMessage::QueuedMessage(MockPersistableQueue* q, + boost::shared_ptr<MockPersistableMessage> msg) : + m_queue(q), + m_msg(msg), + m_enqHandle(q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle())) +{} + +QueuedMessage::QueuedMessage(const QueuedMessage& qm) : + m_queue(qm.m_queue), + m_msg(qm.m_msg), + m_enqHandle(qm.m_enqHandle) +{} + +QueuedMessage::~QueuedMessage() +{} + +QueuedMessage& +QueuedMessage::operator=(const QueuedMessage& rhs) +{ + m_queue = rhs.m_queue; + m_msg = rhs.m_msg; + m_enqHandle = rhs.m_enqHandle; + return *this; +} + +boost::shared_ptr<MockPersistableMessage> +QueuedMessage::payload() const +{ + return m_msg; +} + +const qpid::broker::EnqueueHandle& +QueuedMessage::enqHandle() const +{ + return m_enqHandle; +} + +qpid::broker::EnqueueHandle& +QueuedMessage::enqHandle() +{ + return m_enqHandle; +} + +/* +QueuedMessage::QueuedMessage(boost::shared_ptr<MockPersistableMessage> msg, qpid::broker::EnqueueHandle& enqHandle, - MockTransactionContextPtr txn) : + boost::shared_ptr<MockTransactionContext> txn) : m_msg(msg), m_enqHandle(enqHandle), m_txn(txn) { - if (txn) { + if (txn.get()) { txn->addEnqueuedMsg(this); } } @@ -44,7 +95,7 @@ QueuedMessage::QueuedMessage(MockPersistableMessage::shared_ptr msg, QueuedMessage::~QueuedMessage() {} -MockPersistableMessage::shared_ptr +boost::shared_ptr<MockPersistableMessage> QueuedMessage::getMessage() const { return m_msg; @@ -56,7 +107,7 @@ QueuedMessage::getEnqueueHandle() const return m_enqHandle; } -MockTransactionContextPtr +boost::shared_ptr<MockTransactionContext> QueuedMessage::getTransactionContext() const { return m_txn; @@ -73,5 +124,6 @@ QueuedMessage::clearTransaction() { m_txn.reset(static_cast<MockTransactionContext*>(0)); } +*/ }}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h index a11b23888a..9ad67cc925 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h @@ -24,35 +24,33 @@ #ifndef tests_storePerftools_asyncPerf_QueuedMessage_h_ #define tests_storePerftools_asyncPerf_QueuedMessage_h_ -#include "MockPersistableMessage.h" - #include "qpid/broker/EnqueueHandle.h" + #include <boost/shared_ptr.hpp> namespace tests { namespace storePerftools { namespace asyncPerf { -class MockTransactionContext; -typedef boost::shared_ptr<MockTransactionContext> MockTransactionContextPtr; +class MockPersistableMessage; +class MockPersistableQueue; class QueuedMessage { public: - QueuedMessage(MockPersistableMessage::shared_ptr msg, - qpid::broker::EnqueueHandle& enqHandle, - MockTransactionContextPtr txn); - virtual ~QueuedMessage(); - MockPersistableMessage::shared_ptr getMessage() const; - qpid::broker::EnqueueHandle getEnqueueHandle() const; - MockTransactionContextPtr getTransactionContext() const; - bool isTransactional() const; - void clearTransaction(); - + QueuedMessage(); + QueuedMessage(MockPersistableQueue* q, + boost::shared_ptr<MockPersistableMessage> msg); + QueuedMessage(const QueuedMessage& qm); + ~QueuedMessage(); + QueuedMessage& operator=(const QueuedMessage& rhs); + boost::shared_ptr<MockPersistableMessage> payload() const; + const qpid::broker::EnqueueHandle& enqHandle() const; + qpid::broker::EnqueueHandle& enqHandle(); protected: - MockPersistableMessage::shared_ptr m_msg; + MockPersistableQueue* m_queue; + boost::shared_ptr<MockPersistableMessage> m_msg; qpid::broker::EnqueueHandle m_enqHandle; - MockTransactionContextPtr m_txn; }; }}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp index 27784ef661..2f4461e8b5 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp @@ -30,11 +30,15 @@ namespace asyncPerf { // static declarations uint16_t TestOptions::s_defaultEnqTxnBlkSize = 0; uint16_t TestOptions::s_defaultDeqTxnBlkSize = 0; +bool TestOptions::s_defaultDurable = false; +bool TestOptions::s_defaultDestroyQueuesOnCompletion = false; TestOptions::TestOptions(const std::string& name) : tests::storePerftools::common::TestOptions(name), m_enqTxnBlockSize(s_defaultEnqTxnBlkSize), - m_deqTxnBlockSize(s_defaultDeqTxnBlkSize) + m_deqTxnBlockSize(s_defaultDeqTxnBlkSize), + m_durable(s_defaultDurable), + m_destroyQueuesOnCompletion(s_defaultDestroyQueuesOnCompletion) { doAddOptions(); } @@ -46,10 +50,14 @@ TestOptions::TestOptions(const uint32_t numMsgs, const uint16_t numDeqThreadsPerQueue, const uint16_t enqTxnBlockSize, const uint16_t deqTxnBlockSize, + const bool durable, + const bool destroyQueuesOnCompletion, const std::string& name) : tests::storePerftools::common::TestOptions(numMsgs, msgSize, numQueues, numEnqThreadsPerQueue, numDeqThreadsPerQueue, name), m_enqTxnBlockSize(enqTxnBlockSize), - m_deqTxnBlockSize(deqTxnBlockSize) + m_deqTxnBlockSize(deqTxnBlockSize), + m_durable(durable), + m_destroyQueuesOnCompletion(destroyQueuesOnCompletion) { doAddOptions(); } @@ -63,6 +71,8 @@ TestOptions::printVals(std::ostream& os) const tests::storePerftools::common::TestOptions::printVals(os); os << " Num enqueus per transaction [-t, --enq-txn-size]: " << m_enqTxnBlockSize << std::endl; os << " Num dequeues per transaction [-d, --deq-txn-size]: " << m_deqTxnBlockSize << std::endl; + os << " Durable [--durable]: " << (m_durable ? "true" : "false") << std::endl; + os << " Destroy queues on test completion [--destroy-queues]: " << (m_destroyQueuesOnCompletion ? "true" : "false") << std::endl; } void @@ -73,6 +83,10 @@ TestOptions::doAddOptions() "Num enqueus per transaction (0 = no transactions)") ("deq-txn-size,d", qpid::optValue(m_deqTxnBlockSize, "N"), "Num dequeues per transaction (0 = no transactions)") + ("durable", qpid::optValue(m_durable), + "Queues and messages are durable") + ("destroy-queues", qpid::optValue(m_destroyQueuesOnCompletion), + "Destroy queue recoreds persistent store on test completion") ; } diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.h b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.h index 76b18717fa..b7e1c0a7a8 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.h +++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.h @@ -41,16 +41,22 @@ public: const uint16_t numDeqThreadsPerQueue, const uint16_t enqTxnBlockSize, const uint16_t deqTxnBlockSize, + const bool durable, + const bool destroyQueuesOnCompletion, const std::string& name="Test Options"); virtual ~TestOptions(); void printVals(std::ostream& os) const; uint16_t m_enqTxnBlockSize; ///< Transaction block size for enqueues uint16_t m_deqTxnBlockSize; ///< Transaction block size for dequeues + bool m_durable; ///< Use durable queues and messages for test + bool m_destroyQueuesOnCompletion; ///< Destroy durable queues on completion of test protected: static uint16_t s_defaultEnqTxnBlkSize; ///< Default transaction block size for enqueues static uint16_t s_defaultDeqTxnBlkSize; ///< Default transaction block size for dequeues + static bool s_defaultDurable; ///< Default flag for using durable queues and messages for test + static bool s_defaultDestroyQueuesOnCompletion; ///< Default flag for destroying queues on completion of test void doAddOptions(); }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestResult.h b/cpp/src/tests/storePerftools/asyncPerf/TestResult.h index 1b831c3e17..dc491b074d 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TestResult.h +++ b/cpp/src/tests/storePerftools/asyncPerf/TestResult.h @@ -32,15 +32,13 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class TestOptions; - /** - * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in the journal. + * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in a test. * * This class (being subclassed from ScopedTimable) is passed to a ScopedTimer object on construction, and the * inherited _elapsed member will be written with the calculated elapsed time (in seconds) on destruction of the * ScopedTimer object. This time (initially set to 0.0) is used to calculate message and message byte throughput. - * The message number and size information comes from the JrnlPerfTestParameters object passed to the constructor. + * The message number and size information comes from the TestOptions object passed to the constructor. * * Results are available through the use of toStream(), toString() or the << operators. * diff --git a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp index 118707b3d3..51a5791403 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp @@ -1,16 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TransactionAsyncContext.cpp + */ + #include "TransactionAsyncContext.h" +#include <cassert> + namespace tests { namespace storePerftools { namespace asyncPerf { -TransactionAsyncContext::TransactionAsyncContext(MockTransactionContext* tc, +TransactionAsyncContext::TransactionAsyncContext(boost::shared_ptr<MockTransactionContext> tc, const qpid::asyncStore::AsyncOperation::opCode op): - qpid::broker::BrokerAsyncContext(), m_tc(tc), m_op(op) { - assert(tc != 0); + assert(m_tc.get() != 0); } TransactionAsyncContext::~TransactionAsyncContext() @@ -28,7 +52,7 @@ TransactionAsyncContext::getOpStr() const return qpid::asyncStore::AsyncOperation::getOpStr(m_op); } -MockTransactionContext* +boost::shared_ptr<MockTransactionContext> TransactionAsyncContext::getTransactionContext() const { return m_tc; diff --git a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h index 3bce23046e..2b4f189f41 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h @@ -1,25 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TransactionAsyncContext.h + */ + #ifndef tests_storePerftools_asyncPerf_TransactionAsyncContext_h_ #define tests_storePerftools_asyncPerf_TransactionAsyncContext_h_ -#include "MockTransactionContext.h" #include "qpid/asyncStore/AsyncOperation.h" -#include "qpid/broker/BrokerAsyncContext.h" +#include "qpid/broker/AsyncStore.h" // qpid::broker::BrokerAsyncContext + +#include <boost/shared_ptr.hpp> namespace tests { namespace storePerftools { namespace asyncPerf { -class TransactionAsyncContext: public qpid::broker::BrokerAsyncContext { +class MockTransactionContext; + +class TransactionAsyncContext: public qpid::broker::BrokerAsyncContext +{ public: - TransactionAsyncContext(MockTransactionContext* tc, + TransactionAsyncContext(boost::shared_ptr<MockTransactionContext> tc, const qpid::asyncStore::AsyncOperation::opCode op); virtual ~TransactionAsyncContext(); qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; - MockTransactionContext* getTransactionContext() const; + boost::shared_ptr<MockTransactionContext> getTransactionContext() const; void destroy(); + protected: - MockTransactionContext* m_tc; + boost::shared_ptr<MockTransactionContext> m_tc; const qpid::asyncStore::AsyncOperation::opCode m_op; }; diff --git a/cpp/src/tests/storePerftools/common/Thread.h b/cpp/src/tests/storePerftools/common/Thread.h index bab484dd66..74d25a9da0 100644 --- a/cpp/src/tests/storePerftools/common/Thread.h +++ b/cpp/src/tests/storePerftools/common/Thread.h @@ -27,6 +27,8 @@ #include <pthread.h> #include <string> +#include <boost/shared_ptr.hpp> + namespace tests { namespace storePerftools { namespace common { |