diff options
Diffstat (limited to 'cpp/src')
25 files changed, 455 insertions, 175 deletions
diff --git a/cpp/src/qpid/broker/TxnOp.h b/cpp/src/qpid/broker/TxnOp.h index e98429535e..1626e30ccd 100644 --- a/cpp/src/qpid/broker/TxnOp.h +++ b/cpp/src/qpid/broker/TxnOp.h @@ -27,6 +27,8 @@ namespace qpid { namespace broker { +class TxnHandle; + class TxnOp{ public: virtual ~TxnOp() {} diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index fbe4cfbd7d..ad9a518867 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -380,6 +380,7 @@ endif (UNIX) # Async store perf test (asyncPerf) set (asyncStorePerf_SOURCES storePerftools/asyncPerf/Deliverable.cpp + storePerftools/asyncPerf/DeliveryRecord.cpp storePerftools/asyncPerf/MessageAsyncContext.cpp storePerftools/asyncPerf/MessageConsumer.cpp storePerftools/asyncPerf/MessageDeque.cpp @@ -387,10 +388,11 @@ set (asyncStorePerf_SOURCES storePerftools/asyncPerf/PerfTest.cpp storePerftools/asyncPerf/QueueAsyncContext.cpp storePerftools/asyncPerf/QueuedMessage.cpp - storePerftools/asyncPerf/SimplePersistableMessage.cpp - storePerftools/asyncPerf/SimplePersistableQueue.cpp + storePerftools/asyncPerf/SimpleMessage.cpp + storePerftools/asyncPerf/SimpleQueue.cpp storePerftools/asyncPerf/TestOptions.cpp storePerftools/asyncPerf/TestResult.cpp + storePerftools/asyncPerf/TxnAccept.cpp storePerftools/asyncPerf/TxnPublish.cpp storePerftools/common/Parameters.cpp diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h index 57e130eeba..990d53a199 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h +++ b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h @@ -31,8 +31,8 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableMessage; -class SimplePersistableQueue; +class SimpleMessage; +class SimpleQueue; class Deliverable { @@ -41,8 +41,8 @@ public: virtual ~Deliverable(); virtual uint64_t contentSize() = 0; - virtual void deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue) = 0; - virtual SimplePersistableMessage& getMessage() = 0; + virtual void deliverTo(const boost::shared_ptr<SimpleQueue>& queue) = 0; + virtual SimpleMessage& getMessage() = 0; virtual bool isDelivered() const; protected: diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp new file mode 100644 index 0000000000..7a0224a9b5 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file DeliveryRecord.cpp + */ + +#include "DeliveryRecord.h" + +#include "SimpleMessage.h" +#include "SimpleQueue.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +DeliveryRecord::DeliveryRecord(const QueuedMessage& qm, + bool accepted) : + m_queuedMessage(qm), + m_accepted(accepted), + m_ended(accepted) +{} + +DeliveryRecord::~DeliveryRecord() +{} + +bool +DeliveryRecord::accept(qpid::broker::TxnHandle* txn) +{ + if (!m_ended) { + assert(m_queuedMessage.getQueue()); + m_queuedMessage.getQueue()->dequeue(*txn, m_queuedMessage); + m_accepted = true; + setEnded(); + } + return isRedundant(); +} + +bool +DeliveryRecord::isAccepted() const +{ + return m_accepted; +} + +bool +DeliveryRecord::setEnded() +{ + m_ended = true; + m_queuedMessage.payload() = boost::intrusive_ptr<SimpleMessage>(0); + return isRedundant(); +} + +bool +DeliveryRecord::isEnded() const +{ + return m_ended; +} + +bool +DeliveryRecord::isRedundant() const +{ + return m_ended; +} + + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h new file mode 100644 index 0000000000..25b5446a5f --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file DeliveryRecord.h + */ + +#ifndef tests_storePerftools_asyncPerf_DeliveryRecord_h_ +#define tests_storePerftools_asyncPerf_DeliveryRecord_h_ + +#include "QueuedMessage.h" + +namespace qpid { +namespace broker { +class TxnHandle; +}} + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class DeliveryRecord { +public: + DeliveryRecord(const QueuedMessage& qm, + bool accepted); + virtual ~DeliveryRecord(); + bool accept(qpid::broker::TxnHandle* txn); + bool isAccepted() const; + bool setEnded(); + bool isEnded() const; + bool isRedundant() const; +private: + QueuedMessage m_queuedMessage; + bool m_accepted : 1; + bool m_ended : 1; +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_DeliveryRecord_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp index 5e161d49c8..abb6b5c657 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp @@ -22,7 +22,7 @@ */ #include "MessageAsyncContext.h" -#include "SimplePersistableMessage.h" +#include "SimpleMessage.h" #include <cassert> @@ -30,9 +30,9 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimplePersistableMessage> msg, +MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, const qpid::asyncStore::AsyncOperation::opCode op, - boost::shared_ptr<SimplePersistableQueue> q) : + boost::shared_ptr<SimpleQueue> q) : m_msg(msg), m_op(op), m_q(q) @@ -56,13 +56,13 @@ MessageAsyncContext::getOpStr() const return qpid::asyncStore::AsyncOperation::getOpStr(m_op); } -boost::intrusive_ptr<SimplePersistableMessage> +boost::intrusive_ptr<SimpleMessage> MessageAsyncContext::getMessage() const { return m_msg; } -boost::shared_ptr<SimplePersistableQueue> +boost::shared_ptr<SimpleQueue> MessageAsyncContext::getQueue() const { return m_q; diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h index f13cd4ab64..8418c4c760 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h @@ -34,26 +34,26 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableMessage; -class SimplePersistableQueue; +class SimpleMessage; +class SimpleQueue; class MessageAsyncContext : public qpid::broker::BrokerAsyncContext { public: - MessageAsyncContext(boost::intrusive_ptr<SimplePersistableMessage> msg, + MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, const qpid::asyncStore::AsyncOperation::opCode op, - boost::shared_ptr<SimplePersistableQueue> q); + boost::shared_ptr<SimpleQueue> q); virtual ~MessageAsyncContext(); qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; - boost::intrusive_ptr<SimplePersistableMessage> getMessage() const; - boost::shared_ptr<SimplePersistableQueue> getQueue() const; + boost::intrusive_ptr<SimpleMessage> getMessage() const; + boost::shared_ptr<SimpleQueue> getQueue() const; void destroy(); private: - boost::intrusive_ptr<SimplePersistableMessage> m_msg; + boost::intrusive_ptr<SimpleMessage> m_msg; const qpid::asyncStore::AsyncOperation::opCode m_op; - boost::shared_ptr<SimplePersistableQueue> m_q; + boost::shared_ptr<SimpleQueue> m_q; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 9b015fc428..1859bde947 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -23,9 +23,12 @@ #include "MessageConsumer.h" -#include "SimplePersistableQueue.h" +#include "SimpleQueue.h" #include "TestOptions.h" +#include "qpid/asyncStore/AsyncStoreImpl.h" +#include "qpid/broker/TxnBuffer.h" + #include <stdint.h> // uint32_t namespace tests { @@ -33,8 +36,12 @@ namespace storePerftools { namespace asyncPerf { MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, - boost::shared_ptr<SimplePersistableQueue> queue) : + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq, + boost::shared_ptr<SimpleQueue> queue) : m_perfTestParams(perfTestParams), + m_store(store), + m_resultQueue(arq), m_queue(queue) {} @@ -44,6 +51,13 @@ MessageConsumer::~MessageConsumer() void* MessageConsumer::runConsumers() { + const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U; + uint16_t txnCnt = 0U; + qpid::broker::TxnBuffer* tb = 0; + if (useTxns) { + tb = new qpid::broker::TxnBuffer(m_resultQueue); + } + uint32_t numMsgs = 0; while (numMsgs < m_perfTestParams.m_numMsgs) { if (m_queue->dispatch()) { @@ -52,6 +66,15 @@ MessageConsumer::runConsumers() ::usleep(1000); // TODO - replace this poller with condition variable } } + + if (txnCnt) { + if (m_perfTestParams.m_durable) { + tb->commitLocal(m_store); + } else { + tb->commit(); + } + } + return reinterpret_cast<void*>(0); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h index 7f5816b6a0..5404fe9f58 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h @@ -26,25 +26,37 @@ #include "boost/shared_ptr.hpp" +namespace qpid { +namespace asyncStore { +class AsyncStoreImpl; +} +namespace broker { +class AsyncResultQueue; +}} + namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableQueue; +class SimpleQueue; class TestOptions; class MessageConsumer { public: MessageConsumer(const TestOptions& perfTestParams, - boost::shared_ptr<SimplePersistableQueue> queue); + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq, + boost::shared_ptr<SimpleQueue> queue); virtual ~MessageConsumer(); void* runConsumers(); static void* startConsumers(void* ptr); private: const TestOptions& m_perfTestParams; - boost::shared_ptr<SimplePersistableQueue> m_queue; + qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncResultQueue& m_resultQueue; + boost::shared_ptr<SimpleQueue> m_queue; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index 008ddf33e7..0cab537fb0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -23,8 +23,8 @@ #include "MessageProducer.h" -#include "SimplePersistableMessage.h" -#include "SimplePersistableQueue.h" +#include "SimpleMessage.h" +#include "SimpleQueue.h" #include "TestOptions.h" #include "TxnPublish.h" @@ -41,7 +41,7 @@ MessageProducer::MessageProducer(const TestOptions& perfTestParams, const char* msgData, qpid::asyncStore::AsyncStoreImpl* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr<SimplePersistableQueue> queue) : + boost::shared_ptr<SimpleQueue> queue) : m_perfTestParams(perfTestParams), m_msgData(msgData), m_store(store), @@ -62,7 +62,7 @@ MessageProducer::runProducers() tb = new qpid::broker::TxnBuffer(m_resultQueue); } for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) { - boost::intrusive_ptr<SimplePersistableMessage> msg(new SimplePersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); + boost::intrusive_ptr<SimpleMessage> msg(new SimpleMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); if (useTxns) { boost::shared_ptr<TxnPublish> op(new TxnPublish(msg)); op->deliverTo(m_queue); diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h index 55504164ef..7fa74a2c51 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h @@ -38,7 +38,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableQueue; +class SimpleQueue; class TestOptions; class TxnBuffer; @@ -49,7 +49,7 @@ public: const char* msgData, qpid::asyncStore::AsyncStoreImpl* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr<SimplePersistableQueue> queue); + boost::shared_ptr<SimpleQueue> queue); virtual ~MessageProducer(); void* runProducers(); static void* startProducers(void* ptr); @@ -58,7 +58,7 @@ private: const char* m_msgData; qpid::asyncStore::AsyncStoreImpl* m_store; qpid::broker::AsyncResultQueue& m_resultQueue; - boost::shared_ptr<SimplePersistableQueue> m_queue; + boost::shared_ptr<SimpleQueue> m_queue; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index 7941e761ca..e3fdd1c44d 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -25,7 +25,7 @@ #include "MessageConsumer.h" #include "MessageProducer.h" -#include "SimplePersistableQueue.h" +#include "SimpleQueue.h" #include "tests/storePerftools/version.h" #include "tests/storePerftools/common/ScopedTimer.h" @@ -87,7 +87,7 @@ PerfTest::run() reinterpret_cast<void*>(mp.get()))); threads.push_back(tp); } - boost::shared_ptr<MessageConsumer> mc(new MessageConsumer(m_testOpts, m_queueList[q])); + boost::shared_ptr<MessageConsumer> mc(new MessageConsumer(m_testOpts, m_store, m_resultQueue, m_queueList[q])); m_consumers.push_back(mc); for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mc->startConsumers, @@ -138,7 +138,7 @@ PerfTest::prepareQueues() for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { std::ostringstream qname; qname << "queue_" << std::setw(4) << std::setfill('0') << i; - boost::shared_ptr<SimplePersistableQueue> mpq(new SimplePersistableQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); + boost::shared_ptr<SimpleQueue> mpq(new SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); mpq->asyncCreate(); m_queueList.push_back(mpq); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h index e42db090d2..6cdf015f76 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h @@ -48,7 +48,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableQueue; +class SimpleQueue; class MessageConsumer; class MessageProducer; class TestOptions; @@ -72,7 +72,7 @@ private: qpid::sys::Thread m_pollingThread; qpid::broker::AsyncResultQueueImpl m_resultQueue; qpid::asyncStore::AsyncStoreImpl* m_store; - std::deque<boost::shared_ptr<SimplePersistableQueue> > m_queueList; + std::deque<boost::shared_ptr<SimpleQueue> > m_queueList; std::deque<boost::shared_ptr<MessageProducer> > m_producers; std::deque<boost::shared_ptr<MessageConsumer> > m_consumers; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp index 07a80c8a33..c1c657727b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp @@ -22,7 +22,7 @@ */ #include "QueueAsyncContext.h" -#include "SimplePersistableMessage.h" +#include "SimpleMessage.h" #include <cassert> @@ -30,7 +30,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q, +QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, @@ -44,8 +44,8 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q assert(m_q.get() != 0); } -QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q, - boost::intrusive_ptr<SimplePersistableMessage> msg, +QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, + boost::intrusive_ptr<SimpleMessage> msg, qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, @@ -76,13 +76,13 @@ QueueAsyncContext::getOpStr() const return qpid::asyncStore::AsyncOperation::getOpStr(m_op); } -boost::shared_ptr<SimplePersistableQueue> +boost::shared_ptr<SimpleQueue> QueueAsyncContext::getQueue() const { return m_q; } -boost::intrusive_ptr<SimplePersistableMessage> +boost::intrusive_ptr<SimpleMessage> QueueAsyncContext::getMessage() const { return m_msg; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h index b4d16fe615..112a5ab1dd 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h @@ -36,19 +36,19 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableMessage; -class SimplePersistableQueue; +class SimpleMessage; +class SimpleQueue; class QueueAsyncContext: public qpid::broker::BrokerAsyncContext { public: - QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q, + QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, qpid::broker::AsyncResultQueue* const arq); - QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q, - boost::intrusive_ptr<SimplePersistableMessage> msg, + QueueAsyncContext(boost::shared_ptr<SimpleQueue> q, + boost::intrusive_ptr<SimpleMessage> msg, qpid::broker::TxnHandle& th, const qpid::asyncStore::AsyncOperation::opCode op, qpid::broker::AsyncResultCallback rcb, @@ -56,8 +56,8 @@ public: virtual ~QueueAsyncContext(); qpid::asyncStore::AsyncOperation::opCode getOpCode() const; const char* getOpStr() const; - boost::shared_ptr<SimplePersistableQueue> getQueue() const; - boost::intrusive_ptr<SimplePersistableMessage> getMessage() const; + boost::shared_ptr<SimpleQueue> getQueue() const; + boost::intrusive_ptr<SimpleMessage> getMessage() const; qpid::broker::TxnHandle getTxnHandle() const; qpid::broker::AsyncResultQueue* getAsyncResultQueue() const; qpid::broker::AsyncResultCallback getAsyncResultCallback() const; @@ -65,8 +65,8 @@ public: void destroy(); private: - boost::shared_ptr<SimplePersistableQueue> m_q; - boost::intrusive_ptr<SimplePersistableMessage> m_msg; + boost::shared_ptr<SimpleQueue> m_q; + boost::intrusive_ptr<SimpleMessage> m_msg; qpid::broker::TxnHandle m_th; const qpid::asyncStore::AsyncOperation::opCode m_op; qpid::broker::AsyncResultCallback m_rcb; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp index 8ee858587b..11af7c9466 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp @@ -23,8 +23,8 @@ #include "QueuedMessage.h" -#include "SimplePersistableMessage.h" -#include "SimplePersistableQueue.h" +#include "SimpleMessage.h" +#include "SimpleQueue.h" #include "qpid/asyncStore/AsyncStoreImpl.h" @@ -36,8 +36,8 @@ QueuedMessage::QueuedMessage() : m_queue(0) {} -QueuedMessage::QueuedMessage(SimplePersistableQueue* q, - boost::intrusive_ptr<SimplePersistableMessage> msg) : +QueuedMessage::QueuedMessage(SimpleQueue* q, + boost::intrusive_ptr<SimpleMessage> msg) : m_queue(q), m_msg(msg), m_enqHandle(q->getStore() ? q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()) : qpid::broker::EnqueueHandle(0)) @@ -61,7 +61,13 @@ QueuedMessage::operator=(const QueuedMessage& rhs) return *this; } -boost::intrusive_ptr<SimplePersistableMessage> +SimpleQueue* +QueuedMessage::getQueue() const +{ + return m_queue; +} + +boost::intrusive_ptr<SimpleMessage> QueuedMessage::payload() const { return m_msg; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h index 896c53ab5b..12c8e4da08 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h @@ -39,19 +39,20 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableMessage; -class SimplePersistableQueue; +class SimpleMessage; +class SimpleQueue; class QueuedMessage { public: QueuedMessage(); - QueuedMessage(SimplePersistableQueue* q, - boost::intrusive_ptr<SimplePersistableMessage> msg); + QueuedMessage(SimpleQueue* q, + boost::intrusive_ptr<SimpleMessage> msg); QueuedMessage(const QueuedMessage& qm); ~QueuedMessage(); QueuedMessage& operator=(const QueuedMessage& rhs); - boost::intrusive_ptr<SimplePersistableMessage> payload() const; + SimpleQueue* getQueue() const; + boost::intrusive_ptr<SimpleMessage> payload() const; const qpid::broker::EnqueueHandle& enqHandle() const; qpid::broker::EnqueueHandle& enqHandle(); @@ -61,8 +62,8 @@ public: void abortEnqueue(); private: - SimplePersistableQueue* m_queue; - boost::intrusive_ptr<SimplePersistableMessage> m_msg; + SimpleQueue* m_queue; + boost::intrusive_ptr<SimpleMessage> m_msg; qpid::broker::EnqueueHandle m_enqHandle; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp index a9771c1442..29db6ceaf2 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp @@ -18,10 +18,10 @@ */ /** - * \file SimplePersistableMessage.cpp + * \file SimpleMessage.cpp */ -#include "SimplePersistableMessage.h" +#include "SimpleMessage.h" #include "qpid/asyncStore/AsyncStoreImpl.h" @@ -29,83 +29,83 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -SimplePersistableMessage::SimplePersistableMessage(const char* msgData, - const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store) : +SimpleMessage::SimpleMessage(const char* msgData, + const uint32_t msgSize, + qpid::asyncStore::AsyncStoreImpl* store) : m_persistenceId(0ULL), m_msg(msgData, static_cast<size_t>(msgSize)), m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0)) {} -SimplePersistableMessage::~SimplePersistableMessage() +SimpleMessage::~SimpleMessage() {} const qpid::broker::MessageHandle& -SimplePersistableMessage::getHandle() const +SimpleMessage::getHandle() const { return m_msgHandle; } qpid::broker::MessageHandle& -SimplePersistableMessage::getHandle() +SimpleMessage::getHandle() { return m_msgHandle; } uint64_t -SimplePersistableMessage::contentSize() const +SimpleMessage::contentSize() const { return static_cast<uint64_t>(m_msg.size()); } void -SimplePersistableMessage::setPersistenceId(uint64_t id) const +SimpleMessage::setPersistenceId(uint64_t id) const { m_persistenceId = id; } uint64_t -SimplePersistableMessage::getPersistenceId() const +SimpleMessage::getPersistenceId() const { return m_persistenceId; } void -SimplePersistableMessage::encode(qpid::framing::Buffer& buffer) const +SimpleMessage::encode(qpid::framing::Buffer& buffer) const { buffer.putRawData(m_msg); } uint32_t -SimplePersistableMessage::encodedSize() const +SimpleMessage::encodedSize() const { return static_cast<uint32_t>(m_msg.size()); } void -SimplePersistableMessage::allDequeuesComplete() +SimpleMessage::allDequeuesComplete() {} uint32_t -SimplePersistableMessage::encodedHeaderSize() const +SimpleMessage::encodedHeaderSize() const { return 0; } bool -SimplePersistableMessage::isPersistent() const +SimpleMessage::isPersistent() const { return m_msgHandle.isValid(); } uint64_t -SimplePersistableMessage::getSize() +SimpleMessage::getSize() { return m_msg.size(); } void -SimplePersistableMessage::write(char* target) +SimpleMessage::write(char* target) { ::memcpy(target, m_msg.data(), m_msg.size()); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h index 7ac54ddea1..1b3e034814 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h @@ -18,11 +18,11 @@ */ /** - * \file SimplePersistableMessage.h + * \file SimpleMessage.h */ -#ifndef tests_storePerftools_asyncPerf_SimplePersistableMessage_h_ -#define tests_storePerftools_asyncPerf_SimplePersistableMessage_h_ +#ifndef tests_storePerftools_asyncPerf_SimpleMessage_h_ +#define tests_storePerftools_asyncPerf_SimpleMessage_h_ #include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource #include "qpid/broker/MessageHandle.h" @@ -39,16 +39,16 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimplePersistableQueue; +class SimpleQueue; -class SimplePersistableMessage: public qpid::broker::PersistableMessage, - public qpid::broker::DataSource +class SimpleMessage: public qpid::broker::PersistableMessage, + public qpid::broker::DataSource { public: - SimplePersistableMessage(const char* msgData, - const uint32_t msgSize, - qpid::asyncStore::AsyncStoreImpl* store); - virtual ~SimplePersistableMessage(); + SimpleMessage(const char* msgData, + const uint32_t msgSize, + qpid::asyncStore::AsyncStoreImpl* store); + virtual ~SimpleMessage(); const qpid::broker::MessageHandle& getHandle() const; qpid::broker::MessageHandle& getHandle(); uint64_t contentSize() const; @@ -76,4 +76,4 @@ private: }}} // namespace tests::storePerftools::asyncPerf -#endif // tests_storePerftools_asyncPerf_SimplePersistableMessage_h_ +#endif // tests_storePerftools_asyncPerf_SimpleMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index be2b4c891b..d8b312f011 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -18,13 +18,13 @@ */ /** - * \file SimplePersistableQueue.cpp + * \file SimpleQueue.cpp */ -#include "SimplePersistableQueue.h" +#include "SimpleQueue.h" #include "MessageDeque.h" -#include "SimplePersistableMessage.h" +#include "SimpleMessage.h" #include "QueueAsyncContext.h" #include "QueuedMessage.h" @@ -37,13 +37,13 @@ namespace storePerftools { namespace asyncPerf { //static -qpid::broker::TxnHandle SimplePersistableQueue::s_nullTxnHandle; // used for non-txn operations +qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations -SimplePersistableQueue::SimplePersistableQueue(const std::string& name, - const qpid::framing::FieldTable& /*args*/, - qpid::asyncStore::AsyncStoreImpl* store, - qpid::broker::AsyncResultQueue& arq) : +SimpleQueue::SimpleQueue(const std::string& name, + const qpid::framing::FieldTable& /*args*/, + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq) : qpid::broker::PersistableQueue(), m_name(name), m_store(store), @@ -62,7 +62,7 @@ SimplePersistableQueue::SimplePersistableQueue(const std::string& name, } } -SimplePersistableQueue::~SimplePersistableQueue() +SimpleQueue::~SimpleQueue() { // m_store->flush(*this); // TODO: Make destroying the store a test parameter @@ -72,7 +72,7 @@ SimplePersistableQueue::~SimplePersistableQueue() // static void -SimplePersistableQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh) +SimpleQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh) { if (arh) { boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); @@ -81,7 +81,7 @@ SimplePersistableQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure " << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; } else { -//std::cout << "QQQ SimplePersistableQueue::handleAsyncResult() op=" << qc->getOpStr() << std::endl << std::flush; +//std::cout << "QQQ SimpleQueue::handleAsyncResult() op=" << qc->getOpStr() << std::endl << std::flush; // Handle async success here switch(qc->getOpCode()) { case qpid::asyncStore::AsyncOperation::QUEUE_CREATE: @@ -101,7 +101,7 @@ SimplePersistableQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* break; default: std::ostringstream oss; - oss << "tests::storePerftools::asyncPerf::SimplePersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode(); + oss << "tests::storePerftools::asyncPerf::SimpleQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode(); throw qpid::Exception(oss.str()); }; } @@ -109,25 +109,25 @@ SimplePersistableQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* } const qpid::broker::QueueHandle& -SimplePersistableQueue::getHandle() const +SimpleQueue::getHandle() const { return m_queueHandle; } qpid::broker::QueueHandle& -SimplePersistableQueue::getHandle() +SimpleQueue::getHandle() { return m_queueHandle; } qpid::asyncStore::AsyncStoreImpl* -SimplePersistableQueue::getStore() +SimpleQueue::getStore() { return m_store; } void -SimplePersistableQueue::asyncCreate() +SimpleQueue::asyncCreate() { if (m_store) { boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), @@ -143,7 +143,7 @@ SimplePersistableQueue::asyncCreate() } void -SimplePersistableQueue::asyncDestroy(const bool deleteQueue) +SimpleQueue::asyncDestroy(const bool deleteQueue) { m_destroyPending = true; if (m_store) { @@ -162,7 +162,7 @@ SimplePersistableQueue::asyncDestroy(const bool deleteQueue) } void -SimplePersistableQueue::deliver(boost::intrusive_ptr<SimplePersistableMessage> msg) +SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) { QueuedMessage qm(this, msg); enqueue(s_nullTxnHandle, qm); @@ -170,7 +170,7 @@ SimplePersistableQueue::deliver(boost::intrusive_ptr<SimplePersistableMessage> m } bool -SimplePersistableQueue::dispatch() +SimpleQueue::dispatch() { QueuedMessage qm; if (m_messages->consume(qm)) { @@ -180,8 +180,8 @@ SimplePersistableQueue::dispatch() } bool -SimplePersistableQueue::enqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) +SimpleQueue::enqueue(qpid::broker::TxnHandle& th, + QueuedMessage& qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { @@ -195,8 +195,8 @@ SimplePersistableQueue::enqueue(qpid::broker::TxnHandle& th, } bool -SimplePersistableQueue::dequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) +SimpleQueue::dequeue(qpid::broker::TxnHandle& th, + QueuedMessage& qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { @@ -210,54 +210,54 @@ SimplePersistableQueue::dequeue(qpid::broker::TxnHandle& th, } void -SimplePersistableQueue::process(boost::intrusive_ptr<SimplePersistableMessage> msg) +SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) { QueuedMessage qm(this, msg); push(qm); } void -SimplePersistableQueue::enqueueAborted(boost::intrusive_ptr<SimplePersistableMessage> /*msg*/) +SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage> /*msg*/) {} void -SimplePersistableQueue::encode(qpid::framing::Buffer& buffer) const +SimpleQueue::encode(qpid::framing::Buffer& buffer) const { buffer.putShortString(m_name); } uint32_t -SimplePersistableQueue::encodedSize() const +SimpleQueue::encodedSize() const { return m_name.size() + 1; } uint64_t -SimplePersistableQueue::getPersistenceId() const +SimpleQueue::getPersistenceId() const { return m_persistenceId; } void -SimplePersistableQueue::setPersistenceId(uint64_t persistenceId) const +SimpleQueue::setPersistenceId(uint64_t persistenceId) const { m_persistenceId = persistenceId; } void -SimplePersistableQueue::flush() +SimpleQueue::flush() { //if(m_store) m_store->flush(*this); } const std::string& -SimplePersistableQueue::getName() const +SimpleQueue::getName() const { return m_name; } void -SimplePersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) +SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) { if (externalQueueStore != inst && externalQueueStore) delete externalQueueStore; @@ -265,13 +265,13 @@ SimplePersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* } uint64_t -SimplePersistableQueue::getSize() +SimpleQueue::getSize() { return m_persistableData.size(); } void -SimplePersistableQueue::write(char* target) +SimpleQueue::write(char* target) { ::memcpy(target, m_persistableData.data(), m_persistableData.size()); } @@ -279,14 +279,14 @@ SimplePersistableQueue::write(char* target) // --- Members & methods in msg handling path from qpid::Queue --- // protected -SimplePersistableQueue::UsageBarrier::UsageBarrier(SimplePersistableQueue& q) : +SimpleQueue::UsageBarrier::UsageBarrier(SimpleQueue& q) : m_parent(q), m_count(0) {} // protected bool -SimplePersistableQueue::UsageBarrier::acquire() +SimpleQueue::UsageBarrier::acquire() { qpid::sys::Monitor::ScopedLock l(m_monitor); if (m_parent.m_destroyed) { @@ -298,7 +298,7 @@ SimplePersistableQueue::UsageBarrier::acquire() } // protected -void SimplePersistableQueue::UsageBarrier::release() +void SimpleQueue::UsageBarrier::release() { qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); if (--m_count == 0) { @@ -307,7 +307,7 @@ void SimplePersistableQueue::UsageBarrier::release() } // protected -void SimplePersistableQueue::UsageBarrier::destroy() +void SimpleQueue::UsageBarrier::destroy() { qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); m_parent.m_destroyed = true; @@ -317,13 +317,13 @@ void SimplePersistableQueue::UsageBarrier::destroy() } // protected -SimplePersistableQueue::ScopedUse::ScopedUse(UsageBarrier& b) : +SimpleQueue::ScopedUse::ScopedUse(UsageBarrier& b) : m_barrier(b), m_acquired(m_barrier.acquire()) {} // protected -SimplePersistableQueue::ScopedUse::~ScopedUse() +SimpleQueue::ScopedUse::~ScopedUse() { if (m_acquired) { m_barrier.release(); @@ -332,8 +332,8 @@ SimplePersistableQueue::ScopedUse::~ScopedUse() // private void -SimplePersistableQueue::push(QueuedMessage& qm, - bool /*isRecovery*/) +SimpleQueue::push(QueuedMessage& qm, + bool /*isRecovery*/) { QueuedMessage removed; m_messages->push(qm, removed); @@ -343,8 +343,8 @@ SimplePersistableQueue::push(QueuedMessage& qm, // private bool -SimplePersistableQueue::asyncEnqueue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) +SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th, + QueuedMessage& qm) { qm.payload()->setPersistenceId(m_store->getNextRid()); //std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; @@ -366,8 +366,8 @@ SimplePersistableQueue::asyncEnqueue(qpid::broker::TxnHandle& th, // private bool -SimplePersistableQueue::asyncDequeue(qpid::broker::TxnHandle& th, - QueuedMessage& qm) +SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th, + QueuedMessage& qm) { //std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush; boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), @@ -388,7 +388,7 @@ SimplePersistableQueue::asyncDequeue(qpid::broker::TxnHandle& th, // private void -SimplePersistableQueue::destroyCheck(const std::string& opDescr) const +SimpleQueue::destroyCheck(const std::string& opDescr) const { if (m_destroyPending || m_destroyed) { std::ostringstream oss; @@ -399,7 +399,7 @@ SimplePersistableQueue::destroyCheck(const std::string& opDescr) const // private void -SimplePersistableQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc) +SimpleQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc) { //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": createComplete()" << std::endl << std::flush; assert(qc->getQueue().get() == this); @@ -408,7 +408,7 @@ SimplePersistableQueue::createComplete(const boost::shared_ptr<QueueAsyncContext // private void -SimplePersistableQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc) +SimpleQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc) { //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": flushComplete()" << std::endl << std::flush; assert(qc->getQueue().get() == this); @@ -417,7 +417,7 @@ SimplePersistableQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> // private void -SimplePersistableQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc) +SimpleQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc) { //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": destroyComplete()" << std::endl << std::flush; assert(qc->getQueue().get() == this); @@ -427,7 +427,7 @@ SimplePersistableQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContex // private void -SimplePersistableQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) +SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) { //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; assert(qc->getQueue().get() == this); @@ -441,7 +441,7 @@ SimplePersistableQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContex // private void -SimplePersistableQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) +SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) { //std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush; assert(qc->getQueue().get() == this); diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h index 34ef9407ac..bc9dda0d98 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h @@ -18,11 +18,11 @@ */ /** - * \file SimplePersistableQueue.h + * \file SimpleQueue.h */ -#ifndef tests_storePerftools_asyncPerf_SimplePersistableQueue_h_ -#define tests_storePerftools_asyncPerf_SimplePersistableQueue_h_ +#ifndef tests_storePerftools_asyncPerf_SimpleQueue_h_ +#define tests_storePerftools_asyncPerf_SimpleQueue_h_ #include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter #include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource @@ -51,20 +51,20 @@ namespace storePerftools { namespace asyncPerf { class Messages; -class SimplePersistableMessage; +class SimpleMessage; class QueueAsyncContext; class QueuedMessage; -class SimplePersistableQueue : public boost::enable_shared_from_this<SimplePersistableQueue>, - public qpid::broker::PersistableQueue, - public qpid::broker::DataSource +class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>, + public qpid::broker::PersistableQueue, + public qpid::broker::DataSource { public: - SimplePersistableQueue(const std::string& name, - const qpid::framing::FieldTable& args, - qpid::asyncStore::AsyncStoreImpl* store, - qpid::broker::AsyncResultQueue& arq); - virtual ~SimplePersistableQueue(); + SimpleQueue(const std::string& name, + const qpid::framing::FieldTable& args, + qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncResultQueue& arq); + virtual ~SimpleQueue(); static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res); const qpid::broker::QueueHandle& getHandle() const; @@ -75,14 +75,14 @@ public: void asyncDestroy(const bool deleteQueue); // --- Methods in msg handling path from qpid::Queue --- - void deliver(boost::intrusive_ptr<SimplePersistableMessage> msg); + void deliver(boost::intrusive_ptr<SimpleMessage> msg); bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&) but without Consumer param bool enqueue(qpid::broker::TxnHandle& th, QueuedMessage& qm); bool dequeue(qpid::broker::TxnHandle& th, QueuedMessage& qm); - void process(boost::intrusive_ptr<SimplePersistableMessage> msg); - void enqueueAborted(boost::intrusive_ptr<SimplePersistableMessage> msg); + void process(boost::intrusive_ptr<SimpleMessage> msg); + void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg); // --- Interface qpid::broker::Persistable --- virtual void encode(qpid::framing::Buffer& buffer) const; @@ -115,10 +115,10 @@ private: // --- Members & methods in msg handling path copied from qpid::Queue --- struct UsageBarrier { - SimplePersistableQueue& m_parent; + SimpleQueue& m_parent; uint32_t m_count; qpid::sys::Monitor m_monitor; - UsageBarrier(SimplePersistableQueue& q); + UsageBarrier(SimpleQueue& q); bool acquire(); void release(); void destroy(); @@ -154,4 +154,4 @@ private: }}} // namespace tests::storePerftools::asyncPerf -#endif // tests_storePerftools_asyncPerf_SimplePersistableQueue_h_ +#endif // tests_storePerftools_asyncPerf_SimpleQueue_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp new file mode 100644 index 0000000000..c1d35805a6 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnAccept.cpp + */ + +#include "TxnAccept.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +TxnAccept::TxnAccept() +{} + +TxnAccept::~TxnAccept() +{} + +// --- Interface TxnOp --- + +bool +TxnAccept::prepare(qpid::broker::TxnHandle& /*th*/) throw() +{ + return false; +} + +void +TxnAccept::commit() throw() +{} + +void +TxnAccept::rollback() throw() +{} + +}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h new file mode 100644 index 0000000000..f164a4c965 --- /dev/null +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file TxnAccept.h + */ + +#ifndef tests_storePerftools_asyncPerf_TxnAccept_h_ +#define tests_storePerftools_asyncPerf_TxnAccept_h_ + +#include "qpid/broker/TxnOp.h" + +namespace tests { +namespace storePerftools { +namespace asyncPerf { + +class TxnAccept: public qpid::broker::TxnOp { +public: + TxnAccept(); + virtual ~TxnAccept(); + + // --- Interface TxnOp --- + bool prepare(qpid::broker::TxnHandle& th) throw(); + void commit() throw(); + void rollback() throw(); +}; + +}}} // namespace tests::storePerftools::asyncPerf + +#endif // tests_storePerftools_asyncPerf_TxnAccept_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp index eff7646de6..2927dc60e2 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp @@ -21,8 +21,8 @@ * \file TxnPublish.cpp */ -#include "SimplePersistableMessage.h" -#include "SimplePersistableQueue.h" // debug msg +#include "SimpleMessage.h" +#include "SimpleQueue.h" // debug msg #include "TxnPublish.h" #include "QueuedMessage.h" @@ -31,7 +31,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -TxnPublish::TxnPublish(boost::intrusive_ptr<SimplePersistableMessage> msg) : +TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) : m_msg(msg) { //std::cout << "TTT new TxnPublish" << std::endl << std::flush; @@ -96,7 +96,7 @@ TxnPublish::contentSize() } void -TxnPublish::deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue) +TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) { //std::cout << "TTT TxnPublish::deliverTo queue=\"" << queue->getName() << "\"" << std::endl << std::flush; boost::shared_ptr<QueuedMessage> qm(new QueuedMessage(queue.get(), m_msg)); @@ -104,7 +104,7 @@ TxnPublish::deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue) m_delivered = true; } -SimplePersistableMessage& +SimpleMessage& TxnPublish::getMessage() { return *m_msg; diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h index 6e97e99349..a7255314bd 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h @@ -34,9 +34,7 @@ namespace qpid { namespace broker { - class TransactionContext; - }} namespace tests { @@ -44,14 +42,14 @@ namespace storePerftools { namespace asyncPerf { class QueuedMessage; -class SimplePersistableMessage; -class SimplePersistableQueue; +class SimpleMessage; +class SimpleQueue; class TxnPublish : public qpid::broker::TxnOp, public Deliverable { public: - TxnPublish(boost::intrusive_ptr<SimplePersistableMessage> msg); + TxnPublish(boost::intrusive_ptr<SimpleMessage> msg); virtual ~TxnPublish(); // --- Interface TxOp --- @@ -61,11 +59,11 @@ public: // --- Interface Deliverable --- uint64_t contentSize(); - void deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue); - SimplePersistableMessage& getMessage(); + void deliverTo(const boost::shared_ptr<SimpleQueue>& queue); + SimpleMessage& getMessage(); private: - boost::intrusive_ptr<SimplePersistableMessage> m_msg; + boost::intrusive_ptr<SimpleMessage> m_msg; std::list<boost::shared_ptr<QueuedMessage> > m_queues; std::list<boost::shared_ptr<QueuedMessage> > m_prepared; }; |