diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/asyncstore.cmake | 11 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/AsyncStoreImpl.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/TxnHandleImpl.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/asyncStore/TxnHandleImpl.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/AsyncStore.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueAsyncContext.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueAsyncContext.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleConsumer.h | 42 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleDeliverable.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp) | 19 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleDeliverable.h (renamed from cpp/src/tests/storePerftools/asyncPerf/Deliverable.h) | 21 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleDeliveryRecord.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp) | 52 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleDeliveryRecord.h (renamed from cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h) | 41 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleMessage.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp) | 56 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleMessage.h (renamed from cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h) | 33 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp) | 27 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleMessageAsyncContext.h (renamed from cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h) | 25 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleMessageDeque.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp) | 28 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleMessageDeque.h (renamed from cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h) | 32 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleMessages.h (renamed from cpp/src/tests/storePerftools/asyncPerf/Messages.h) | 25 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleQueue.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp) | 155 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleQueue.h (renamed from cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h) | 99 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleQueuedMessage.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp) | 57 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleQueuedMessage.h (renamed from cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h) | 40 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleTxnAccept.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp) | 27 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleTxnAccept.h (renamed from cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h) | 29 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleTxnBuffer.cpp (renamed from cpp/src/qpid/broker/TxnBuffer.cpp) | 52 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleTxnBuffer.h (renamed from cpp/src/qpid/broker/TxnBuffer.h) | 22 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleTxnOp.h (renamed from cpp/src/qpid/broker/TxnOp.h) | 16 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleTxnPublish.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp) | 35 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SimpleTxnPublish.h (renamed from cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h) | 37 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnAsyncContext.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxnAsyncContext.h | 12 | ||||
-rw-r--r-- | cpp/src/tests/asyncstore.cmake | 18 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp | 25 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h | 24 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp | 23 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h | 16 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp | 36 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/PerfTest.h | 9 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp | 9 | ||||
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp | 6 |
42 files changed, 576 insertions, 632 deletions
diff --git a/cpp/src/asyncstore.cmake b/cpp/src/asyncstore.cmake index 5656bc8cc4..6171ed5505 100644 --- a/cpp/src/asyncstore.cmake +++ b/cpp/src/asyncstore.cmake @@ -64,8 +64,17 @@ set (asyncStore_SOURCES qpid/broker/MessageHandle.cpp qpid/broker/QueueAsyncContext.cpp qpid/broker/QueueHandle.cpp + qpid/broker/SimpleDeliverable.cpp + qpid/broker/SimpleDeliveryRecord.cpp + qpid/broker/SimpleMessage.cpp + qpid/broker/SimpleMessageAsyncContext.cpp + qpid/broker/SimpleMessageDeque.cpp + qpid/broker/SimpleQueue.cpp + qpid/broker/SimpleQueuedMessage.cpp + qpid/broker/SimpleTxnAccept.cpp + qpid/broker/SimpleTxnBuffer.cpp + qpid/broker/SimpleTxnPublish.cpp qpid/broker/TxnAsyncContext.cpp - qpid/broker/TxnBuffer.cpp qpid/broker/TxnHandle.cpp ) diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index aa66e7adb8..2ee1d23025 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -75,7 +75,7 @@ AsyncStoreImpl::createTxnHandle() } qpid::broker::TxnHandle -AsyncStoreImpl::createTxnHandle(qpid::broker::TxnBuffer* tb) +AsyncStoreImpl::createTxnHandle(qpid::broker::SimpleTxnBuffer* tb) { return qpid::broker::TxnHandle(new TxnHandleImpl(tb)); } @@ -90,7 +90,7 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid, qpid::broker::TxnHandle AsyncStoreImpl::createTxnHandle(const std::string& xid, const bool tpcFlag, - qpid::broker::TxnBuffer* tb) + qpid::broker::SimpleTxnBuffer* tb) { return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tpcFlag, tb)); } diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index eb3f090ad7..40a7552a68 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -42,8 +42,7 @@ class Poller; namespace asyncStore { -class AsyncStoreImpl : public qpid::broker::AsyncTransactionalStore, - public qpid::broker::AsyncStore +class AsyncStoreImpl : public qpid::broker::AsyncStore { public: AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller, @@ -59,12 +58,12 @@ public: // --- Interface from AsyncTransactionalStore --- qpid::broker::TxnHandle createTxnHandle(); - qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb); + qpid::broker::TxnHandle createTxnHandle(qpid::broker::SimpleTxnBuffer* tb); qpid::broker::TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag); qpid::broker::TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag, - qpid::broker::TxnBuffer* tb); + qpid::broker::SimpleTxnBuffer* tb); void submitPrepare(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt); diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp index dd644b29bd..50dce1b2af 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp @@ -31,7 +31,7 @@ TxnHandleImpl::TxnHandleImpl() : m_txnBuffer(0) {} -TxnHandleImpl::TxnHandleImpl(qpid::broker::TxnBuffer* tb) : +TxnHandleImpl::TxnHandleImpl(qpid::broker::SimpleTxnBuffer* tb) : m_tpcFlag(false), m_txnBuffer(tb) {} @@ -44,7 +44,7 @@ TxnHandleImpl::TxnHandleImpl(const std::string& xid, const bool tpcFlag) : TxnHandleImpl::TxnHandleImpl(const std::string& xid, const bool tpcFlag, - qpid::broker::TxnBuffer* tb) : + qpid::broker::SimpleTxnBuffer* tb) : m_xid(xid), m_tpcFlag(tpcFlag), m_txnBuffer(tb) diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h index e1f8afff3e..ce23665d5b 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h @@ -33,7 +33,7 @@ namespace qpid { namespace broker { -class TxnBuffer; +class SimpleTxnBuffer; } namespace asyncStore { @@ -42,9 +42,9 @@ class TxnHandleImpl : public virtual qpid::RefCounted { public: TxnHandleImpl(); - TxnHandleImpl(qpid::broker::TxnBuffer* tb); + TxnHandleImpl(qpid::broker::SimpleTxnBuffer* tb); TxnHandleImpl(const std::string& xid, const bool tpcFlag); - TxnHandleImpl(const std::string& xid, const bool tpcFlag, qpid::broker::TxnBuffer* tb); + TxnHandleImpl(const std::string& xid, const bool tpcFlag, qpid::broker::SimpleTxnBuffer* tb); virtual ~TxnHandleImpl(); const std::string& getXid() const; bool is2pc() const; @@ -52,7 +52,7 @@ public: private: std::string m_xid; bool m_tpcFlag; - qpid::broker::TxnBuffer* const m_txnBuffer; + qpid::broker::SimpleTxnBuffer* const m_txnBuffer; }; }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 6f1c02e059..7009565a7c 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -70,19 +70,19 @@ class TxnHandle; class QueueAsyncContext; class TpcTxnAsyncContext; class TxnAsyncContext; -class TxnBuffer; +class SimpleTxnBuffer; class AsyncTransactionalStore { public: virtual ~AsyncTransactionalStore() {} virtual TxnHandle createTxnHandle() = 0; - virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0; + virtual TxnHandle createTxnHandle(SimpleTxnBuffer* tb) = 0; virtual TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag) = 0; virtual TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag, - TxnBuffer* tb) = 0; + SimpleTxnBuffer* tb) = 0; virtual void submitPrepare(TxnHandle&, boost::shared_ptr<TpcTxnAsyncContext>) = 0; // Distributed txns only @@ -94,7 +94,7 @@ public: }; // Subclassed by store: -class AsyncStore { +class AsyncStore : public AsyncTransactionalStore { public: virtual ~AsyncStore() {} diff --git a/cpp/src/qpid/broker/QueueAsyncContext.cpp b/cpp/src/qpid/broker/QueueAsyncContext.cpp index 4bd2d271eb..02eb2e9546 100644 --- a/cpp/src/qpid/broker/QueueAsyncContext.cpp +++ b/cpp/src/qpid/broker/QueueAsyncContext.cpp @@ -48,7 +48,7 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, {} QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), @@ -61,7 +61,7 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, boost::intrusive_ptr<PersistableMessage> msg, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), @@ -89,7 +89,7 @@ QueueAsyncContext::getMessage() const return m_msg; } -TxnBuffer* +SimpleTxnBuffer* QueueAsyncContext::getTxnBuffer() const { return m_tb; } diff --git a/cpp/src/qpid/broker/QueueAsyncContext.h b/cpp/src/qpid/broker/QueueAsyncContext.h index e9ba2ebbac..8657922377 100644 --- a/cpp/src/qpid/broker/QueueAsyncContext.h +++ b/cpp/src/qpid/broker/QueueAsyncContext.h @@ -52,18 +52,18 @@ public: AsyncResultCallback rcb, AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr<PersistableQueue> q, boost::intrusive_ptr<PersistableMessage> msg, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); virtual ~QueueAsyncContext(); boost::shared_ptr<PersistableQueue> getQueue() const; boost::intrusive_ptr<PersistableMessage> getMessage() const; - TxnBuffer* getTxnBuffer() const; + SimpleTxnBuffer* getTxnBuffer() const; AsyncResultQueue* getAsyncResultQueue() const; AsyncResultCallback getAsyncResultCallback() const; void invokeCallback(const AsyncResultHandle* const arh) const; @@ -72,7 +72,7 @@ public: private: boost::shared_ptr<PersistableQueue> m_q; boost::intrusive_ptr<PersistableMessage> m_msg; - TxnBuffer* m_tb; + SimpleTxnBuffer* m_tb; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; }; diff --git a/cpp/src/qpid/broker/SimpleConsumer.h b/cpp/src/qpid/broker/SimpleConsumer.h new file mode 100644 index 0000000000..6601c65a42 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleConsumer.h @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleConsumer.h + */ + +#ifndef qpid_broker_SimpleConsumer_h_ +#define qpid_broker_SimpleConsumer_h_ + +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { +class SimpleDeliveryRecord; + +class SimpleConsumer { +public: + virtual ~SimpleConsumer() {} + virtual void commitComplete() = 0; + virtual void record(boost::shared_ptr<SimpleDeliveryRecord> dr) = 0; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleConsumer_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp b/cpp/src/qpid/broker/SimpleDeliverable.cpp index 9da7e348e0..7037a377c5 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp +++ b/cpp/src/qpid/broker/SimpleDeliverable.cpp @@ -18,26 +18,23 @@ */ /** - * \file Deliverable.cpp + * \file SimpleDeliverable.cpp */ -#include "Deliverable.h" +#include "SimpleDeliverable.h" -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { -Deliverable::Deliverable() : +SimpleDeliverable::SimpleDeliverable() : m_delivered(false) {} -Deliverable::~Deliverable() -{} +SimpleDeliverable::~SimpleDeliverable() {} bool -Deliverable::isDelivered() const -{ +SimpleDeliverable::isDelivered() const { return m_delivered; } -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h b/cpp/src/qpid/broker/SimpleDeliverable.h index 990d53a199..6441e14841 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h +++ b/cpp/src/qpid/broker/SimpleDeliverable.h @@ -18,27 +18,26 @@ */ /** - * \file Deliverable.h + * \file SimpleDeliverable.h */ -#ifndef tests_storePerftools_asyncPerf_Deliverable_h_ -#define tests_storePerftools_asyncPerf_Deliverable_h_ +#ifndef qpid_broker_SimpleDeliverable_h_ +#define qpid_broker_SimpleDeliverable_h_ #include <boost/shared_ptr.hpp> #include <stdint.h> // uint64_t -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { class SimpleMessage; class SimpleQueue; -class Deliverable +class SimpleDeliverable { public: - Deliverable(); - virtual ~Deliverable(); + SimpleDeliverable(); + virtual ~SimpleDeliverable(); virtual uint64_t contentSize() = 0; virtual void deliverTo(const boost::shared_ptr<SimpleQueue>& queue) = 0; @@ -49,6 +48,6 @@ protected: bool m_delivered; }; -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker -#endif // tests_storePerftools_asyncPerf_Deliverable_h_ +#endif // qpid_broker_SimpleDeliverable_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp index 6f33369a26..b71df6975b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp @@ -18,35 +18,32 @@ */ /** - * \file DeliveryRecord.cpp + * \file SimpleDeliveryRecord.cpp */ -#include "DeliveryRecord.h" +#include "SimpleDeliveryRecord.h" -#include "MessageConsumer.h" -#include "QueuedMessage.h" +#include "SimpleConsumer.h" #include "SimpleMessage.h" #include "SimpleQueue.h" +#include "SimpleQueuedMessage.h" -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { -DeliveryRecord::DeliveryRecord(boost::shared_ptr<QueuedMessage> qm, - MessageConsumer& mc, - bool accepted) : +SimpleDeliveryRecord::SimpleDeliveryRecord(boost::shared_ptr<SimpleQueuedMessage> qm, + SimpleConsumer& sc, + bool accepted) : m_queuedMessage(qm), - m_msgConsumer(mc), + m_msgConsumer(sc), m_accepted(accepted), m_ended(accepted) {} -DeliveryRecord::~DeliveryRecord() -{} +SimpleDeliveryRecord::~SimpleDeliveryRecord() {} bool -DeliveryRecord::accept() -{ +SimpleDeliveryRecord::accept() { if (!m_ended) { m_queuedMessage->getQueue()->dequeue(m_queuedMessage); m_accepted = true; @@ -56,47 +53,40 @@ DeliveryRecord::accept() } bool -DeliveryRecord::isAccepted() const -{ +SimpleDeliveryRecord::isAccepted() const { return m_accepted; } bool -DeliveryRecord::setEnded() -{ +SimpleDeliveryRecord::setEnded() { m_ended = true; m_queuedMessage->payload() = boost::intrusive_ptr<SimpleMessage>(0); return isRedundant(); } bool -DeliveryRecord::isEnded() const -{ +SimpleDeliveryRecord::isEnded() const { return m_ended; } bool -DeliveryRecord::isRedundant() const -{ +SimpleDeliveryRecord::isRedundant() const { return m_ended; } void -DeliveryRecord::dequeue(qpid::broker::TxnBuffer* tb) -{ +SimpleDeliveryRecord::dequeue(qpid::broker::SimpleTxnBuffer* tb) { m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage); } void -DeliveryRecord::committed() const -{ +SimpleDeliveryRecord::committed() const { m_msgConsumer.commitComplete(); } -boost::shared_ptr<QueuedMessage> -DeliveryRecord::getQueuedMessage() const -{ +boost::shared_ptr<SimpleQueuedMessage> +SimpleDeliveryRecord::getQueuedMessage() const { return m_queuedMessage; } -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/qpid/broker/SimpleDeliveryRecord.h index 6c5d87f374..622ce578d7 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h +++ b/cpp/src/qpid/broker/SimpleDeliveryRecord.h @@ -18,49 +18,42 @@ */ /** - * \file DeliveryRecord.h + * \file SimpleDeliveryRecord.h */ -#ifndef tests_storePerftools_asyncPerf_DeliveryRecord_h_ -#define tests_storePerftools_asyncPerf_DeliveryRecord_h_ - -//#include "QueuedMessage.h" +#ifndef qpid_broker_SimpleDeliveryRecord_h_ +#define qpid_broker_SimpleDeliveryRecord_h_ #include <boost/shared_ptr.hpp> namespace qpid { namespace broker { -class TxnBuffer; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { -class MessageConsumer; -class QueuedMessage; +class SimpleConsumer; +class SimpleQueuedMessage; +class SimpleTxnBuffer; -class DeliveryRecord { +class SimpleDeliveryRecord { public: - DeliveryRecord(boost::shared_ptr<QueuedMessage> qm, - MessageConsumer& mc, - bool accepted); - virtual ~DeliveryRecord(); + SimpleDeliveryRecord(boost::shared_ptr<SimpleQueuedMessage> qm, + SimpleConsumer& sc, + bool accepted); + virtual ~SimpleDeliveryRecord(); bool accept(); bool isAccepted() const; bool setEnded(); bool isEnded() const; bool isRedundant() const; - void dequeue(qpid::broker::TxnBuffer* tb); + void dequeue(qpid::broker::SimpleTxnBuffer* tb); void committed() const; - boost::shared_ptr<QueuedMessage> getQueuedMessage() const; + boost::shared_ptr<SimpleQueuedMessage> getQueuedMessage() const; private: - boost::shared_ptr<QueuedMessage> m_queuedMessage; - MessageConsumer& m_msgConsumer; + boost::shared_ptr<SimpleQueuedMessage> m_queuedMessage; + SimpleConsumer& m_msgConsumer; bool m_accepted : 1; bool m_ended : 1; }; -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker -#endif // tests_storePerftools_asyncPerf_DeliveryRecord_h_ +#endif // qpid_broker_SimpleDeliveryRecord_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp b/cpp/src/qpid/broker/SimpleMessage.cpp index bacf438b9f..1239533edf 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp +++ b/cpp/src/qpid/broker/SimpleMessage.cpp @@ -25,98 +25,84 @@ #include <string.h> // memcpy() -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { SimpleMessage::SimpleMessage(const char* msgData, const uint32_t msgSize) : m_persistenceId(0ULL), m_msg(msgData, static_cast<size_t>(msgSize)), m_store(0), - m_msgHandle(qpid::broker::MessageHandle()) + m_msgHandle(MessageHandle()) {} SimpleMessage::SimpleMessage(const char* msgData, const uint32_t msgSize, - qpid::broker::AsyncStore* store) : + AsyncStore* store) : m_persistenceId(0ULL), m_msg(msgData, static_cast<size_t>(msgSize)), m_store(store), - m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle()) + m_msgHandle(store ? store->createMessageHandle(this) : MessageHandle()) {} -SimpleMessage::~SimpleMessage() -{} +SimpleMessage::~SimpleMessage() {} -const qpid::broker::MessageHandle& -SimpleMessage::getHandle() const -{ +const MessageHandle& +SimpleMessage::getHandle() const { return m_msgHandle; } -qpid::broker::MessageHandle& -SimpleMessage::getHandle() -{ +MessageHandle& +SimpleMessage::getHandle() { return m_msgHandle; } uint64_t -SimpleMessage::contentSize() const -{ +SimpleMessage::contentSize() const { return static_cast<uint64_t>(m_msg.size()); } void -SimpleMessage::setPersistenceId(uint64_t id) const -{ +SimpleMessage::setPersistenceId(uint64_t id) const { m_persistenceId = id; } uint64_t -SimpleMessage::getPersistenceId() const -{ +SimpleMessage::getPersistenceId() const { return m_persistenceId; } void -SimpleMessage::encode(qpid::framing::Buffer& buffer) const -{ +SimpleMessage::encode(qpid::framing::Buffer& buffer) const { buffer.putRawData(m_msg); } uint32_t -SimpleMessage::encodedSize() const -{ +SimpleMessage::encodedSize() const { return static_cast<uint32_t>(m_msg.size()); } void -SimpleMessage::allDequeuesComplete() -{} +SimpleMessage::allDequeuesComplete() {} uint32_t -SimpleMessage::encodedHeaderSize() const -{ +SimpleMessage::encodedHeaderSize() const { return 0; } bool -SimpleMessage::isPersistent() const -{ +SimpleMessage::isPersistent() const { return m_store != 0; } uint64_t -SimpleMessage::getSize() -{ +SimpleMessage::getSize() { return m_msg.size(); } void -SimpleMessage::write(char* target) -{ +SimpleMessage::write(char* target) { ::memcpy(target, m_msg.data(), m_msg.size()); } -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h b/cpp/src/qpid/broker/SimpleMessage.h index 169f5a8959..edfaa8d13b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h +++ b/cpp/src/qpid/broker/SimpleMessage.h @@ -21,29 +21,28 @@ * \file SimpleMessage.h */ -#ifndef tests_storePerftools_asyncPerf_SimpleMessage_h_ -#define tests_storePerftools_asyncPerf_SimpleMessage_h_ +#ifndef qpid_broker_SimpleMessage_h_ +#define qpid_broker_SimpleMessage_h_ -#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource -#include "qpid/broker/MessageHandle.h" -#include "qpid/broker/PersistableMessage.h" +#include "AsyncStore.h" // DataSource +#include "MessageHandle.h" +#include "PersistableMessage.h" -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { -class SimpleMessage: public qpid::broker::PersistableMessage, - public qpid::broker::DataSource +class SimpleMessage: public PersistableMessage, + public DataSource { public: SimpleMessage(const char* msgData, const uint32_t msgSize); SimpleMessage(const char* msgData, const uint32_t msgSize, - qpid::broker::AsyncStore* store); + AsyncStore* store); virtual ~SimpleMessage(); - const qpid::broker::MessageHandle& getHandle() const; - qpid::broker::MessageHandle& getHandle(); + const MessageHandle& getHandle() const; + MessageHandle& getHandle(); uint64_t contentSize() const; // --- Interface Persistable --- @@ -64,11 +63,11 @@ public: private: mutable uint64_t m_persistenceId; const std::string m_msg; - qpid::broker::AsyncStore* m_store; + AsyncStore* m_store; - qpid::broker::MessageHandle m_msgHandle; + MessageHandle m_msgHandle; }; -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker -#endif // tests_storePerftools_asyncPerf_SimpleMessage_h_ +#endif // qpid_broker_SimpleMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp index e3bfe9ae7a..a88258f5bc 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp +++ b/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp @@ -18,21 +18,20 @@ */ /** - * \file MessageContext.cpp + * \file SimpleMessageAsyncContext.cpp */ -#include "MessageAsyncContext.h" +#include "SimpleMessageAsyncContext.h" #include "SimpleMessage.h" #include <cassert> -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { -MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, - boost::shared_ptr<SimpleQueue> q) : +SimpleMessageAsyncContext::SimpleMessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, + boost::shared_ptr<SimpleQueue> q) : m_msg(msg), m_q(q) { @@ -40,25 +39,21 @@ MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg assert(m_q.get() != 0); } -MessageAsyncContext::~MessageAsyncContext() -{} +SimpleMessageAsyncContext::~SimpleMessageAsyncContext() {} boost::intrusive_ptr<SimpleMessage> -MessageAsyncContext::getMessage() const -{ +SimpleMessageAsyncContext::getMessage() const { return m_msg; } boost::shared_ptr<SimpleQueue> -MessageAsyncContext::getQueue() const -{ +SimpleMessageAsyncContext::getQueue() const { return m_q; } void -MessageAsyncContext::destroy() -{ +SimpleMessageAsyncContext::destroy() { delete this; } -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/qpid/broker/SimpleMessageAsyncContext.h index 9252fbda45..e3975e790e 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h +++ b/cpp/src/qpid/broker/SimpleMessageAsyncContext.h @@ -18,30 +18,29 @@ */ /** - * \file MessageContext.h + * \file SimpleMessageAsyncContext.h */ -#ifndef tests_storePerfTools_asyncPerf_MessageContext_h_ -#define tests_storePerfTools_asyncPerf_MessageContext_h_ +#ifndef qpid_broker_SimpleMessageAsyncContext_h_ +#define qpid_broker_SimpleMessageAsyncContext_h_ -#include "qpid/asyncStore/AsyncOperation.h" +#include "AsyncStore.h" // BrokerAsyncContext #include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { class SimpleMessage; class SimpleQueue; -class MessageAsyncContext : public qpid::broker::BrokerAsyncContext +class SimpleMessageAsyncContext : public BrokerAsyncContext { public: - MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, - boost::shared_ptr<SimpleQueue> q); - virtual ~MessageAsyncContext(); + SimpleMessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, + boost::shared_ptr<SimpleQueue> q); + virtual ~SimpleMessageAsyncContext(); boost::intrusive_ptr<SimpleMessage> getMessage() const; boost::shared_ptr<SimpleQueue> getQueue() const; void destroy(); @@ -51,6 +50,6 @@ private: boost::shared_ptr<SimpleQueue> m_q; }; -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker -#endif // tests_storePerfTools_asyncPerf_MessageContext_h_ +#endif // qpid_broker_SimpleMessageAsyncContext_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp b/cpp/src/qpid/broker/SimpleMessageDeque.cpp index 1fa2c087ac..0aadcfd94a 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp +++ b/cpp/src/qpid/broker/SimpleMessageDeque.cpp @@ -18,41 +18,35 @@ */ /** - * \file MessageDeque.cpp + * \file SimpleMessageDeque.cpp */ -#include "MessageDeque.h" +#include "SimpleMessageDeque.h" -#include "QueuedMessage.h" +#include "SimpleQueuedMessage.h" -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { -MessageDeque::MessageDeque() -{} +SimpleMessageDeque::SimpleMessageDeque() {} -MessageDeque::~MessageDeque() -{} +SimpleMessageDeque::~SimpleMessageDeque() {} uint32_t -MessageDeque::size() -{ +SimpleMessageDeque::size() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); return m_messages.size(); } bool -MessageDeque::push(boost::shared_ptr<QueuedMessage>& added) -{ +SimpleMessageDeque::push(boost::shared_ptr<SimpleQueuedMessage>& added) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); m_messages.push_back(added); return false; } bool -MessageDeque::consume(boost::shared_ptr<QueuedMessage>& msg) -{ +SimpleMessageDeque::consume(boost::shared_ptr<SimpleQueuedMessage>& msg) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); if (!m_messages.empty()) { msg = m_messages.front(); @@ -62,4 +56,4 @@ MessageDeque::consume(boost::shared_ptr<QueuedMessage>& msg) return false; } -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h b/cpp/src/qpid/broker/SimpleMessageDeque.h index 021015f3e0..5db0755a43 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h +++ b/cpp/src/qpid/broker/SimpleMessageDeque.h @@ -18,42 +18,40 @@ */ /** - * \file MessageDeque.h + * \file SimpleMessageDeque.h */ /* * This is a copy of qpid::broker::MessageDeque.h, but using the local - * tests::storePerftools::asyncPerf::QueuedMessage class instead of - * qpid::broker::QueuedMessage. + * SimpleQueuedMessage class instead of QueuedMessage. */ -#ifndef tests_storePerftools_asyncPerf_MessageDeque_h_ -#define tests_storePerftools_asyncPerf_MessageDeque_h_ +#ifndef qpid_broker_SimpleMessageDeque_h_ +#define qpid_broker_SimpleMessageDeque_h_ -#include "Messages.h" +#include "SimpleMessages.h" #include "qpid/sys/Mutex.h" #include <deque> -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { -class MessageDeque : public Messages +class SimpleMessageDeque : public SimpleMessages { public: - MessageDeque(); - virtual ~MessageDeque(); + SimpleMessageDeque(); + virtual ~SimpleMessageDeque(); uint32_t size(); - bool push(boost::shared_ptr<QueuedMessage>& added); - bool consume(boost::shared_ptr<QueuedMessage>& msg); + bool push(boost::shared_ptr<SimpleQueuedMessage>& added); + bool consume(boost::shared_ptr<SimpleQueuedMessage>& msg); private: - std::deque<boost::shared_ptr<QueuedMessage> > m_messages; + std::deque<boost::shared_ptr<SimpleQueuedMessage> > m_messages; qpid::sys::Mutex m_msgMutex; }; -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker -#endif // tests_storePerftools_asyncPerf_MessageDeque_h_ +#endif // qpid_broker_SimpleMessageDeque_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/Messages.h b/cpp/src/qpid/broker/SimpleMessages.h index c1bfa328ea..2a40859032 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/Messages.h +++ b/cpp/src/qpid/broker/SimpleMessages.h @@ -18,7 +18,7 @@ */ /** - * \file Messages.h + * \file SimpleMessages.h */ /* @@ -27,27 +27,26 @@ * qpid::broker::QueuedMessage. */ -#ifndef tests_storePerftools_asyncPerf_Messages_h_ -#define tests_storePerftools_asyncPerf_Messages_h_ +#ifndef qpid_broker_SimpleMessages_h_ +#define qpid_broker_SimpleMessages_h_ #include <boost/shared_ptr.hpp> #include <stdint.h> -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { -class QueuedMessage; +class SimpleQueuedMessage; -class Messages +class SimpleMessages { public: - virtual ~Messages() {} + virtual ~SimpleMessages() {} virtual uint32_t size() = 0; - virtual bool push(boost::shared_ptr<QueuedMessage>& added) = 0; - virtual bool consume(boost::shared_ptr<QueuedMessage>& msg) = 0; + virtual bool push(boost::shared_ptr<SimpleQueuedMessage>& added) = 0; + virtual bool consume(boost::shared_ptr<SimpleQueuedMessage>& msg) = 0; }; -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker -#endif // tests_storePerftools_asyncPerf_Messages_h_ +#endif // qpid_broker_SimpleMessages_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/qpid/broker/SimpleQueue.cpp index 06b4e9333f..5cd8841f94 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/qpid/broker/SimpleQueue.cpp @@ -23,31 +23,29 @@ #include "SimpleQueue.h" -#include "DeliveryRecord.h" -#include "MessageConsumer.h" -#include "MessageDeque.h" -#include "QueuedMessage.h" +#include "AsyncResultHandle.h" +#include "QueueAsyncContext.h" +#include "SimpleConsumer.h" +#include "SimpleDeliveryRecord.h" #include "SimpleMessage.h" - -#include "qpid/broker/AsyncResultHandle.h" -#include "qpid/broker/QueueAsyncContext.h" -#include "qpid/broker/TxnBuffer.h" +#include "SimpleMessageDeque.h" +#include "SimpleQueuedMessage.h" +#include "SimpleTxnBuffer.h" #include <string.h> // memcpy() -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { //static -qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations +TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations SimpleQueue::SimpleQueue(const std::string& name, const qpid::framing::FieldTable& /*args*/, - qpid::broker::AsyncStore* store, - qpid::broker::AsyncResultQueue& arq) : - qpid::broker::PersistableQueue(), + AsyncStore* store, + AsyncResultQueue& arq) : + PersistableQueue(), m_name(name), m_store(store), m_resultQueue(arq), @@ -57,7 +55,7 @@ SimpleQueue::SimpleQueue(const std::string& name, m_destroyPending(false), m_destroyed(false), m_barrier(*this), - m_messages(new MessageDeque()) + m_messages(new SimpleMessageDeque()) { if (m_store != 0) { const qpid::types::Variant::Map qo; @@ -67,17 +65,17 @@ SimpleQueue::SimpleQueue(const std::string& name, SimpleQueue::~SimpleQueue() {} -const qpid::broker::QueueHandle& +const QueueHandle& SimpleQueue::getHandle() const { return m_queueHandle; } -qpid::broker::QueueHandle& +QueueHandle& SimpleQueue::getHandle() { return m_queueHandle; } -qpid::broker::AsyncStore* +AsyncStore* SimpleQueue::getStore() { return m_store; } @@ -85,9 +83,9 @@ SimpleQueue::getStore() { void SimpleQueue::asyncCreate() { if (m_store) { - boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - &handleAsyncCreateResult, - &m_resultQueue)); + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + &handleAsyncCreateResult, + &m_resultQueue)); m_store->submitCreate(m_queueHandle, this, qac); ++m_asyncOpCounter; } @@ -95,10 +93,9 @@ SimpleQueue::asyncCreate() { //static void -SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh) { +SimpleQueue::handleAsyncCreateResult(const AsyncResultHandle* const arh) { if (arh) { - boost::shared_ptr<qpid::broker::QueueAsyncContext> qc = - boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); if (arh->getErrNo()) { // TODO: Handle async failure here (other than by simply printing a message) @@ -116,9 +113,9 @@ SimpleQueue::asyncDestroy(const bool deleteQueue) m_destroyPending = true; if (m_store) { if (deleteQueue) { - boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - &handleAsyncDestroyResult, - &m_resultQueue)); + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + &handleAsyncDestroyResult, + &m_resultQueue)); m_store->submitDestroy(m_queueHandle, qac); ++m_asyncOpCounter; } @@ -128,10 +125,10 @@ SimpleQueue::asyncDestroy(const bool deleteQueue) //static void -SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh) { +SimpleQueue::handleAsyncDestroyResult(const AsyncResultHandle* const arh) { if (arh) { - boost::shared_ptr<qpid::broker::QueueAsyncContext> qc = - boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<QueueAsyncContext> qc = + boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); if (arh->getErrNo()) { // TODO: Handle async failure here (other than by simply printing a message) @@ -145,30 +142,30 @@ SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* con void SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) { - boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg))); + boost::shared_ptr<SimpleQueuedMessage> qm(boost::shared_ptr<SimpleQueuedMessage>(new SimpleQueuedMessage(this, msg))); enqueue(qm); push(qm); } bool -SimpleQueue::dispatch(MessageConsumer& mc) { - boost::shared_ptr<QueuedMessage> qm; +SimpleQueue::dispatch(SimpleConsumer& sc) { + boost::shared_ptr<SimpleQueuedMessage> qm; if (m_messages->consume(qm)) { - boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false)); - mc.record(dr); + boost::shared_ptr<SimpleDeliveryRecord> dr(new SimpleDeliveryRecord(qm, sc, false)); + sc.record(dr); return true; } return false; } bool -SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) { +SimpleQueue::enqueue(boost::shared_ptr<SimpleQueuedMessage> qm) { return enqueue(0, qm); } bool -SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm) { +SimpleQueue::enqueue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; @@ -181,13 +178,13 @@ SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb, } bool -SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) { +SimpleQueue::dequeue(boost::shared_ptr<SimpleQueuedMessage> qm) { return dequeue(0, qm); } bool -SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm) { +SimpleQueue::dequeue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm) { ScopedUse u(m_barrier); if (!u.m_acquired) { return false; @@ -201,7 +198,7 @@ SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb, void SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) { - push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg))); + push(boost::shared_ptr<SimpleQueuedMessage>(new SimpleQueuedMessage(this, msg))); } void @@ -238,7 +235,7 @@ SimpleQueue::getName() const { } void -SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) { +SimpleQueue::setExternalQueueStore(ExternalQueueStore* inst) { if (externalQueueStore != inst && externalQueueStore) delete externalQueueStore; externalQueueStore = inst; @@ -264,8 +261,7 @@ SimpleQueue::UsageBarrier::UsageBarrier(SimpleQueue& q) : // protected bool -SimpleQueue::UsageBarrier::acquire() -{ +SimpleQueue::UsageBarrier::acquire() { qpid::sys::Monitor::ScopedLock l(m_monitor); if (m_parent.m_destroyed) { return false; @@ -276,8 +272,7 @@ SimpleQueue::UsageBarrier::acquire() } // protected -void SimpleQueue::UsageBarrier::release() -{ +void SimpleQueue::UsageBarrier::release() { qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); if (--m_count == 0) { m_monitor.notifyAll(); @@ -285,8 +280,7 @@ void SimpleQueue::UsageBarrier::release() } // protected -void SimpleQueue::UsageBarrier::destroy() -{ +void SimpleQueue::UsageBarrier::destroy() { qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); m_parent.m_destroyed = true; while (m_count) { @@ -301,8 +295,7 @@ SimpleQueue::ScopedUse::ScopedUse(UsageBarrier& b) : {} // protected -SimpleQueue::ScopedUse::~ScopedUse() -{ +SimpleQueue::ScopedUse::~ScopedUse() { if (m_acquired) { m_barrier.release(); } @@ -310,9 +303,8 @@ SimpleQueue::ScopedUse::~ScopedUse() // private void -SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm, - bool /*isRecovery*/) -{ +SimpleQueue::push(boost::shared_ptr<SimpleQueuedMessage> qm, + bool /*isRecovery*/) { m_messages->push(qm); } @@ -320,14 +312,14 @@ SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm, // private bool -SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm) { +SimpleQueue::asyncEnqueue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm) { assert(qm.get()); - boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - qm->payload(), - tb, - &handleAsyncEnqueueResult, - &m_resultQueue)); + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + qm->payload(), + tb, + &handleAsyncEnqueueResult, + &m_resultQueue)); if (tb) { tb->incrOpCnt(); m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac); @@ -340,10 +332,10 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb, // private static void -SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh) { +SimpleQueue::handleAsyncEnqueueResult(const AsyncResultHandle* const arh) { if (arh) { - boost::shared_ptr<qpid::broker::QueueAsyncContext> qc = - boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<QueueAsyncContext> qc = + boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); if (arh->getErrNo()) { // TODO: Handle async failure here (other than by simply printing a message) @@ -357,15 +349,14 @@ SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* con // private bool -SimpleQueue::asyncDequeue(/*boost::shared_ptr<qpid::broker::TxnBuffer>*/qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm) -{ +SimpleQueue::asyncDequeue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm) { assert(qm.get()); - boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - qm->payload(), - tb, - &handleAsyncDequeueResult, - &m_resultQueue)); + boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(), + qm->payload(), + tb, + &handleAsyncDequeueResult, + &m_resultQueue)); if (tb) { tb->incrOpCnt(); m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac); @@ -375,12 +366,12 @@ SimpleQueue::asyncDequeue(/*boost::shared_ptr<qpid::broker::TxnBuffer>*/qpid::br ++m_asyncOpCounter; return true; } + // private static void -SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh) { +SimpleQueue::handleAsyncDequeueResult(const AsyncResultHandle* const arh) { if (arh) { - boost::shared_ptr<qpid::broker::QueueAsyncContext> qc = - boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext()); + boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext()); boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); if (arh->getErrNo()) { // TODO: Handle async failure here (other than by simply printing a message) @@ -404,7 +395,7 @@ SimpleQueue::destroyCheck(const std::string& opDescr) const { // private void -SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { +SimpleQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc) { if (qc.get()) { assert(qc->getQueue().get() == this); } @@ -413,7 +404,7 @@ SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncCont // private void -SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { +SimpleQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc) { if (qc.get()) { assert(qc->getQueue().get() == this); } @@ -422,7 +413,7 @@ SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncConte // private void -SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { +SimpleQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc) { if (qc.get()) { assert(qc->getQueue().get() == this); } @@ -432,7 +423,7 @@ SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncCon // private void -SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { +SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) { if (qc.get()) { assert(qc->getQueue().get() == this); if (qc->getTxnBuffer()) { // transactional enqueue @@ -444,7 +435,7 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncCon // private void -SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { +SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) { if (qc.get()) { assert(qc->getQueue().get() == this); if (qc->getTxnBuffer()) { // transactional enqueue @@ -454,4 +445,4 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncCon --m_asyncOpCounter; } -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/qpid/broker/SimpleQueue.h index 5f64c9b960..c2f21076cd 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h +++ b/cpp/src/qpid/broker/SimpleQueue.h @@ -21,11 +21,11 @@ * \file SimpleQueue.h */ -#ifndef tests_storePerftools_asyncPerf_SimpleQueue_h_ -#define tests_storePerftools_asyncPerf_SimpleQueue_h_ +#ifndef qpid_broker_SimpleQueue_h_ +#define qpid_broker_SimpleQueue_h_ #include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter -#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource +#include "qpid/broker/AsyncStore.h" // DataSource #include "qpid/broker/PersistableQueue.h" #include "qpid/broker/QueueHandle.h" #include "qpid/sys/Monitor.h" @@ -35,53 +35,50 @@ #include <boost/enable_shared_from_this.hpp> namespace qpid { -namespace broker { -class AsyncResultQueue; -class QueueAsyncContext; -class TxnBuffer; -} + namespace framing { class FieldTable; -}} +} -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace broker { -class MessageConsumer; -class Messages; -class QueuedMessage; +class AsyncResultQueue; +class QueueAsyncContext; +class SimpleConsumer; +class SimpleMessages; +class SimpleQueuedMessage; class SimpleMessage; +class SimpleTxnBuffer; class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>, - public qpid::broker::PersistableQueue, - public qpid::broker::DataSource + public PersistableQueue, + public DataSource { public: SimpleQueue(const std::string& name, const qpid::framing::FieldTable& args, - qpid::broker::AsyncStore* store, - qpid::broker::AsyncResultQueue& arq); + AsyncStore* store, + AsyncResultQueue& arq); virtual ~SimpleQueue(); - const qpid::broker::QueueHandle& getHandle() const; - qpid::broker::QueueHandle& getHandle(); - qpid::broker::AsyncStore* getStore(); + const QueueHandle& getHandle() const; + QueueHandle& getHandle(); + AsyncStore* getStore(); void asyncCreate(); - static void handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh); + static void handleAsyncCreateResult(const AsyncResultHandle* const arh); void asyncDestroy(const bool deleteQueue); - static void handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh); + static void handleAsyncDestroyResult(const AsyncResultHandle* const arh); // --- Methods in msg handling path from qpid::Queue --- void deliver(boost::intrusive_ptr<SimpleMessage> msg); - bool dispatch(MessageConsumer& mc); - bool enqueue(boost::shared_ptr<QueuedMessage> qm); - bool enqueue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm); - bool dequeue(boost::shared_ptr<QueuedMessage> qm); - bool dequeue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm); + bool dispatch(SimpleConsumer& sc); + bool enqueue(boost::shared_ptr<SimpleQueuedMessage> qm); + bool enqueue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm); + bool dequeue(boost::shared_ptr<SimpleQueuedMessage> qm); + bool dequeue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm); void process(boost::intrusive_ptr<SimpleMessage> msg); void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg); @@ -94,22 +91,22 @@ public: // --- Interface qpid::broker::PersistableQueue --- virtual void flush(); virtual const std::string& getName() const; - virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst); + virtual void setExternalQueueStore(ExternalQueueStore* inst); // --- Interface qpid::broker::DataStore --- virtual uint64_t getSize(); virtual void write(char* target); private: - static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations + static TxnHandle s_nullTxnHandle; // used for non-txn operations const std::string m_name; - qpid::broker::AsyncStore* m_store; - qpid::broker::AsyncResultQueue& m_resultQueue; + AsyncStore* m_store; + AsyncResultQueue& m_resultQueue; qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; // TODO: change this to non-async store counter! mutable uint64_t m_persistenceId; std::string m_persistableData; - qpid::broker::QueueHandle m_queueHandle; + QueueHandle m_queueHandle; bool m_destroyPending; bool m_destroyed; @@ -130,29 +127,29 @@ private: ~ScopedUse(); }; UsageBarrier m_barrier; - std::auto_ptr<Messages> m_messages; - void push(boost::shared_ptr<QueuedMessage> qm, + std::auto_ptr<SimpleMessages> m_messages; + void push(boost::shared_ptr<SimpleQueuedMessage> qm, bool isRecovery = false); // -- Async ops --- - bool asyncEnqueue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm); - static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh); - bool asyncDequeue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm); - static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh); + bool asyncEnqueue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm); + static void handleAsyncEnqueueResult(const AsyncResultHandle* const arh); + bool asyncDequeue(SimpleTxnBuffer* tb, + boost::shared_ptr<SimpleQueuedMessage> qm); + static void handleAsyncDequeueResult(const AsyncResultHandle* const arh); // --- Async op counter --- void destroyCheck(const std::string& opDescr) const; // --- Async op completions (called through handleAsyncResult) --- - void createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc); - void flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc); - void destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc); - void enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc); - void dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc); + void createComplete(const boost::shared_ptr<QueueAsyncContext> qc); + void flushComplete(const boost::shared_ptr<QueueAsyncContext> qc); + void destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc); + void enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc); + void dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc); }; -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker -#endif // tests_storePerftools_asyncPerf_SimpleQueue_h_ +#endif // qpid_broker_SimpleQueue_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/qpid/broker/SimpleQueuedMessage.cpp index 0d16248c7f..35ac799ecc 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ b/cpp/src/qpid/broker/SimpleQueuedMessage.cpp @@ -18,27 +18,24 @@ */ /** - * \file QueuedMessage.cpp + * \file SimpleQueuedMessage.cpp */ -#include "QueuedMessage.h" +#include "SimpleQueuedMessage.h" #include "SimpleMessage.h" #include "SimpleQueue.h" -#include "qpid/asyncStore/AsyncStoreImpl.h" +namespace qpid { +namespace broker { -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -QueuedMessage::QueuedMessage() : +SimpleQueuedMessage::SimpleQueuedMessage() : m_queue(0) {} -QueuedMessage::QueuedMessage(SimpleQueue* q, - boost::intrusive_ptr<SimpleMessage> msg) : - boost::enable_shared_from_this<QueuedMessage>(), +SimpleQueuedMessage::SimpleQueuedMessage(SimpleQueue* q, + boost::intrusive_ptr<SimpleMessage> msg) : + boost::enable_shared_from_this<SimpleQueuedMessage>(), m_queue(q), m_msg(msg) { @@ -47,63 +44,55 @@ QueuedMessage::QueuedMessage(SimpleQueue* q, } } -QueuedMessage::QueuedMessage(const QueuedMessage& qm) : - boost::enable_shared_from_this<QueuedMessage>(), +SimpleQueuedMessage::SimpleQueuedMessage(const SimpleQueuedMessage& qm) : + boost::enable_shared_from_this<SimpleQueuedMessage>(), m_queue(qm.m_queue), m_msg(qm.m_msg), m_enqHandle(qm.m_enqHandle) {} -QueuedMessage::QueuedMessage(QueuedMessage* const qm) : - boost::enable_shared_from_this<QueuedMessage>(), +SimpleQueuedMessage::SimpleQueuedMessage(SimpleQueuedMessage* const qm) : + boost::enable_shared_from_this<SimpleQueuedMessage>(), m_queue(qm->m_queue), m_msg(qm->m_msg), m_enqHandle(qm->m_enqHandle) {} -QueuedMessage::~QueuedMessage() -{} +SimpleQueuedMessage::~SimpleQueuedMessage() {} SimpleQueue* -QueuedMessage::getQueue() const -{ +SimpleQueuedMessage::getQueue() const { return m_queue; } boost::intrusive_ptr<SimpleMessage> -QueuedMessage::payload() const -{ +SimpleQueuedMessage::payload() const { return m_msg; } -const qpid::broker::EnqueueHandle& -QueuedMessage::enqHandle() const -{ +const EnqueueHandle& +SimpleQueuedMessage::enqHandle() const { return m_enqHandle; } -qpid::broker::EnqueueHandle& -QueuedMessage::enqHandle() -{ +EnqueueHandle& +SimpleQueuedMessage::enqHandle() { return m_enqHandle; } void -QueuedMessage::prepareEnqueue(qpid::broker::TxnBuffer* tb) -{ +SimpleQueuedMessage::prepareEnqueue(SimpleTxnBuffer* tb) { m_queue->enqueue(tb, shared_from_this()); } void -QueuedMessage::commitEnqueue() -{ +SimpleQueuedMessage::commitEnqueue() { m_queue->process(m_msg); } void -QueuedMessage::abortEnqueue() -{ +SimpleQueuedMessage::abortEnqueue() { m_queue->enqueueAborted(m_msg); } -}}} // namespace tests::storePerfTools +}} // namespace qpid::broker diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/qpid/broker/SimpleQueuedMessage.h index 630fe1aedc..1172eb73f3 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ b/cpp/src/qpid/broker/SimpleQueuedMessage.h @@ -18,14 +18,14 @@ */ /** - * \file QueuedMessage.h + * \file SimpleQueuedMessage.h */ -#ifndef tests_storePerftools_asyncPerf_QueuedMessage_h_ -#define tests_storePerftools_asyncPerf_QueuedMessage_h_ +#ifndef qpid_broker_SimpleQueuedMessage_h_ +#define qpid_broker_SimpleQueuedMessage_h_ -#include "qpid/broker/AsyncStore.h" -#include "qpid/broker/EnqueueHandle.h" +#include "AsyncStore.h" +#include "EnqueueHandle.h" #include <boost/enable_shared_from_this.hpp> #include <boost/intrusive_ptr.hpp> @@ -33,33 +33,25 @@ namespace qpid { namespace broker { -class TxnHandle; - -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - class SimpleMessage; class SimpleQueue; -class QueuedMessage : public boost::enable_shared_from_this<QueuedMessage> +class SimpleQueuedMessage : public boost::enable_shared_from_this<SimpleQueuedMessage> { public: - QueuedMessage(); - QueuedMessage(SimpleQueue* q, + SimpleQueuedMessage(); + SimpleQueuedMessage(SimpleQueue* q, boost::intrusive_ptr<SimpleMessage> msg); - QueuedMessage(const QueuedMessage& qm); - QueuedMessage(QueuedMessage* const qm); - virtual ~QueuedMessage(); + SimpleQueuedMessage(const SimpleQueuedMessage& qm); + SimpleQueuedMessage(SimpleQueuedMessage* const qm); + virtual ~SimpleQueuedMessage(); SimpleQueue* getQueue() const; boost::intrusive_ptr<SimpleMessage> payload() const; - const qpid::broker::EnqueueHandle& enqHandle() const; - qpid::broker::EnqueueHandle& enqHandle(); + const EnqueueHandle& enqHandle() const; + EnqueueHandle& enqHandle(); // --- Transaction handling --- - void prepareEnqueue(qpid::broker::TxnBuffer* tb); + void prepareEnqueue(qpid::broker::SimpleTxnBuffer* tb); void commitEnqueue(); void abortEnqueue(); @@ -69,6 +61,6 @@ private: qpid::broker::EnqueueHandle m_enqHandle; }; -}}} // namespace tests::storePerfTools +}} // namespace qpid::broker -#endif // tests_storePerftools_asyncPerf_QueuedMessage_h_ +#endif // qpid_broker_SimpleQueuedMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/qpid/broker/SimpleTxnAccept.cpp index 375cd568d2..343bbb54c7 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp +++ b/cpp/src/qpid/broker/SimpleTxnAccept.cpp @@ -18,31 +18,30 @@ */ /** - * \file TxnAccept.cpp + * \file SimpleTxnAccept.cpp */ -#include "TxnAccept.h" +#include "SimpleTxnAccept.h" -#include "DeliveryRecord.h" +#include "SimpleDeliveryRecord.h" #include "qpid/log/Statement.h" -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { -TxnAccept::TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops) : +SimpleTxnAccept::SimpleTxnAccept(std::deque<boost::shared_ptr<SimpleDeliveryRecord> >& ops) : m_ops(ops) {} -TxnAccept::~TxnAccept() {} +SimpleTxnAccept::~SimpleTxnAccept() {} // --- Interface TxnOp --- bool -TxnAccept::prepare(qpid::broker::TxnBuffer* tb) throw() { +SimpleTxnAccept::prepare(SimpleTxnBuffer* tb) throw() { try { - for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + for (std::deque<boost::shared_ptr<SimpleDeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->dequeue(tb); } return true; @@ -55,9 +54,9 @@ TxnAccept::prepare(qpid::broker::TxnBuffer* tb) throw() { } void -TxnAccept::commit() throw() { +SimpleTxnAccept::commit() throw() { try { - for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) { + for (std::deque<boost::shared_ptr<SimpleDeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) { (*i)->committed(); (*i)->setEnded(); } @@ -69,6 +68,6 @@ TxnAccept::commit() throw() { } void -TxnAccept::rollback() throw() {} +SimpleTxnAccept::rollback() throw() {} -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/qpid/broker/SimpleTxnAccept.h index 5d84289965..eb6963bc88 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h +++ b/cpp/src/qpid/broker/SimpleTxnAccept.h @@ -18,36 +18,35 @@ */ /** - * \file TxnAccept.h + * \file SimpleTxnAccept.h */ -#ifndef tests_storePerftools_asyncPerf_TxnAccept_h_ -#define tests_storePerftools_asyncPerf_TxnAccept_h_ +#ifndef tests_storePerftools_asyncPerf_SimpleTxnAccept_h_ +#define tests_storePerftools_asyncPerf_SimpleTxnAccept_h_ -#include "qpid/broker/TxnOp.h" +#include "SimpleTxnOp.h" #include "boost/shared_ptr.hpp" #include <deque> -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { -class DeliveryRecord; +class SimpleDeliveryRecord; -class TxnAccept: public qpid::broker::TxnOp { +class SimpleTxnAccept: public SimpleTxnOp { public: - TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops); - virtual ~TxnAccept(); + SimpleTxnAccept(std::deque<boost::shared_ptr<SimpleDeliveryRecord> >& ops); + virtual ~SimpleTxnAccept(); // --- Interface TxnOp --- - bool prepare(qpid::broker::TxnBuffer* tb) throw(); + bool prepare(SimpleTxnBuffer* tb) throw(); void commit() throw(); void rollback() throw(); private: - std::deque<boost::shared_ptr<DeliveryRecord> > m_ops; + std::deque<boost::shared_ptr<SimpleDeliveryRecord> > m_ops; }; -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker -#endif // tests_storePerftools_asyncPerf_TxnAccept_h_ +#endif // tests_storePerftools_asyncPerf_SimpleTxnAccept_h_ diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp index 4d6e7b7918..d72a785c2a 100644 --- a/cpp/src/qpid/broker/TxnBuffer.cpp +++ b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp @@ -18,14 +18,14 @@ */ /** - * \file TxnBuffer.cpp + * \file SimpleTxnBuffer.cpp */ -#include "TxnBuffer.h" +#include "SimpleTxnBuffer.h" #include "AsyncResultHandle.h" +#include "SimpleTxnOp.h" #include "TxnAsyncContext.h" -#include "TxnOp.h" #include "qpid/log/Statement.h" @@ -34,9 +34,9 @@ namespace qpid { namespace broker { -qpid::sys::Mutex TxnBuffer::s_uuidMutex; +qpid::sys::Mutex SimpleTxnBuffer::s_uuidMutex; -TxnBuffer::TxnBuffer(AsyncResultQueue& arq) : +SimpleTxnBuffer::SimpleTxnBuffer(AsyncResultQueue& arq) : m_store(0), m_resultQueue(arq), m_tpcFlag(false), @@ -47,7 +47,7 @@ TxnBuffer::TxnBuffer(AsyncResultQueue& arq) : createLocalXid(); } -TxnBuffer::TxnBuffer(AsyncResultQueue& arq, std::string& xid) : +SimpleTxnBuffer::SimpleTxnBuffer(AsyncResultQueue& arq, std::string& xid) : m_store(0), m_resultQueue(arq), m_xid(xid), @@ -61,31 +61,31 @@ TxnBuffer::TxnBuffer(AsyncResultQueue& arq, std::string& xid) : } } -TxnBuffer::~TxnBuffer() {} +SimpleTxnBuffer::~SimpleTxnBuffer() {} TxnHandle& -TxnBuffer::getTxnHandle() { +SimpleTxnBuffer::getTxnHandle() { return m_txnHandle; } const std::string& -TxnBuffer::getXid() const { +SimpleTxnBuffer::getXid() const { return m_xid; } bool -TxnBuffer::is2pc() const { +SimpleTxnBuffer::is2pc() const { return m_tpcFlag; } void -TxnBuffer::incrOpCnt() { +SimpleTxnBuffer::incrOpCnt() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_submitOpCntMutex); ++m_submitOpCnt; } void -TxnBuffer::decrOpCnt() { +SimpleTxnBuffer::decrOpCnt() { const uint32_t numOps = getNumOps(); qpid::sys::ScopedLock<qpid::sys::Mutex> l2(m_completeOpCntMutex); qpid::sys::ScopedLock<qpid::sys::Mutex> l3(m_submitOpCntMutex); @@ -99,15 +99,15 @@ TxnBuffer::decrOpCnt() { } void -TxnBuffer::enlist(boost::shared_ptr<TxnOp> op) { +SimpleTxnBuffer::enlist(boost::shared_ptr<SimpleTxnOp> op) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); m_ops.push_back(op); } bool -TxnBuffer::prepare() { +SimpleTxnBuffer::prepare() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); - for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + for(std::vector<boost::shared_ptr<SimpleTxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { if (!(*i)->prepare(this)) { return false; } @@ -116,25 +116,25 @@ TxnBuffer::prepare() { } void -TxnBuffer::commit() { +SimpleTxnBuffer::commit() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); - for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + for(std::vector<boost::shared_ptr<SimpleTxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->commit(); } m_ops.clear(); } void -TxnBuffer::rollback() { +SimpleTxnBuffer::rollback() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); - for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + for(std::vector<boost::shared_ptr<SimpleTxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { (*i)->rollback(); } m_ops.clear(); } bool -TxnBuffer::commitLocal(AsyncTransactionalStore* const store) { +SimpleTxnBuffer::commitLocal(AsyncTransactionalStore* const store) { try { m_store = store; asyncLocalCommit(); @@ -147,7 +147,7 @@ TxnBuffer::commitLocal(AsyncTransactionalStore* const store) { } void -TxnBuffer::asyncLocalCommit() { +SimpleTxnBuffer::asyncLocalCommit() { switch(m_state) { case NONE: m_state = PREPARE; @@ -180,7 +180,7 @@ TxnBuffer::asyncLocalCommit() { //static void -TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { +SimpleTxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { if (arh) { boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); if (arh->getErrNo()) { @@ -194,7 +194,7 @@ TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { } void -TxnBuffer::asyncLocalAbort() { +SimpleTxnBuffer::asyncLocalAbort() { assert(m_store != 0); switch (m_state) { case NONE: @@ -218,7 +218,7 @@ TxnBuffer::asyncLocalAbort() { //static void -TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { +SimpleTxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { if (arh) { boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); if (arh->getErrNo()) { @@ -231,14 +231,14 @@ TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { // private uint32_t -TxnBuffer::getNumOps() const { +SimpleTxnBuffer::getNumOps() const { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex); return m_ops.size(); } // private void -TxnBuffer::createLocalXid() +SimpleTxnBuffer::createLocalXid() { uuid_t uuid; { diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/SimpleTxnBuffer.h index 02569f6545..b2164cfeed 100644 --- a/cpp/src/qpid/broker/TxnBuffer.h +++ b/cpp/src/qpid/broker/SimpleTxnBuffer.h @@ -18,11 +18,11 @@ */ /** - * \file TxnBuffer.h + * \file SimpleTxnBuffer.h */ -#ifndef qpid_broker_TxnBuffer_h_ -#define qpid_broker_TxnBuffer_h_ +#ifndef qpid_broker_SimpleTxnBuffer_h_ +#define qpid_broker_SimpleTxnBuffer_h_ #include "TxnHandle.h" @@ -37,20 +37,20 @@ namespace broker { class AsyncResultHandle; class AsyncResultQueue; class AsyncTransactionalStore; -class TxnOp; +class SimpleTxnOp; -class TxnBuffer { +class SimpleTxnBuffer { public: - TxnBuffer(AsyncResultQueue& arq); - TxnBuffer(AsyncResultQueue& arq, std::string& xid); - virtual ~TxnBuffer(); + SimpleTxnBuffer(AsyncResultQueue& arq); + SimpleTxnBuffer(AsyncResultQueue& arq, std::string& xid); + virtual ~SimpleTxnBuffer(); TxnHandle& getTxnHandle(); const std::string& getXid() const; bool is2pc() const; void incrOpCnt(); void decrOpCnt(); - void enlist(boost::shared_ptr<TxnOp> op); + void enlist(boost::shared_ptr<SimpleTxnOp> op); bool prepare(); void commit(); void rollback(); @@ -68,7 +68,7 @@ private: mutable qpid::sys::Mutex m_completeOpCntMutex; static qpid::sys::Mutex s_uuidMutex; - std::vector<boost::shared_ptr<TxnOp> > m_ops; + std::vector<boost::shared_ptr<SimpleTxnOp> > m_ops; TxnHandle m_txnHandle; AsyncTransactionalStore* m_store; AsyncResultQueue& m_resultQueue; @@ -86,4 +86,4 @@ private: }} // namespace qpid::broker -#endif // qpid_broker_TxnBuffer_h_ +#endif // qpid_broker_SimpleTxnBuffer_h_ diff --git a/cpp/src/qpid/broker/TxnOp.h b/cpp/src/qpid/broker/SimpleTxnOp.h index bcff87551c..2cec2da8f0 100644 --- a/cpp/src/qpid/broker/TxnOp.h +++ b/cpp/src/qpid/broker/SimpleTxnOp.h @@ -18,27 +18,27 @@ */ /** - * \file TxnOp.h + * \file SimpleTxnOp.h */ -#ifndef qpid_broker_TxnOp_h_ -#define qpid_broker_TxnOp_h_ +#ifndef qpid_broker_SimpleTxnOp_h_ +#define qpid_broker_SimpleTxnOp_h_ #include <boost/shared_ptr.hpp> namespace qpid { namespace broker { -class TxnBuffer; +class SimpleTxnBuffer; -class TxnOp{ +class SimpleTxnOp{ public: - virtual ~TxnOp() {} - virtual bool prepare(qpid::broker::TxnBuffer*) throw() = 0; + virtual ~SimpleTxnOp() {} + virtual bool prepare(SimpleTxnBuffer*) throw() = 0; virtual void commit() throw() = 0; virtual void rollback() throw() = 0; }; }} // namespace qpid::broker -#endif // qpid_broker_TxnOp_h_ +#endif // qpid_broker_SimpleTxnOp_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/qpid/broker/SimpleTxnPublish.cpp index cc36a38be7..6ad6a108ea 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp +++ b/cpp/src/qpid/broker/SimpleTxnPublish.cpp @@ -18,30 +18,29 @@ */ /** - * \file TxnPublish.cpp + * \file SimpleTxnPublish.cpp */ -#include "TxnPublish.h" +#include "SimpleTxnPublish.h" -#include "QueuedMessage.h" #include "SimpleMessage.h" #include "SimpleQueue.h" +#include "SimpleQueuedMessage.h" #include "qpid/log/Statement.h" #include <boost/make_shared.hpp> -namespace tests { -namespace storePerftools { -namespace asyncPerf { +namespace qpid { +namespace broker { -TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) : +SimpleTxnPublish::SimpleTxnPublish(boost::intrusive_ptr<SimpleMessage> msg) : m_msg(msg) {} -TxnPublish::~TxnPublish() {} +SimpleTxnPublish::~SimpleTxnPublish() {} bool -TxnPublish::prepare(qpid::broker::TxnBuffer* tb) throw() { +SimpleTxnPublish::prepare(SimpleTxnBuffer* tb) throw() { try { while (!m_queues.empty()) { m_queues.front()->prepareEnqueue(tb); @@ -58,9 +57,9 @@ TxnPublish::prepare(qpid::broker::TxnBuffer* tb) throw() { } void -TxnPublish::commit() throw() { +SimpleTxnPublish::commit() throw() { try { - for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { + for (std::list<boost::shared_ptr<SimpleQueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { (*i)->commitEnqueue(); } } catch (const std::exception& e) { @@ -71,9 +70,9 @@ TxnPublish::commit() throw() { } void -TxnPublish::rollback() throw() { +SimpleTxnPublish::rollback() throw() { try { - for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { + for (std::list<boost::shared_ptr<SimpleQueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { (*i)->abortEnqueue(); } } catch (const std::exception& e) { @@ -84,19 +83,19 @@ TxnPublish::rollback() throw() { } uint64_t -TxnPublish::contentSize() { +SimpleTxnPublish::contentSize() { return m_msg->contentSize(); } void -TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) { - m_queues.push_back(boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg))); +SimpleTxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) { + m_queues.push_back(boost::shared_ptr<SimpleQueuedMessage>(new SimpleQueuedMessage(queue.get(), m_msg))); m_delivered = true; } SimpleMessage& -TxnPublish::getMessage() { +SimpleTxnPublish::getMessage() { return *m_msg; } -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/qpid/broker/SimpleTxnPublish.h index eae9ef9c4c..0aaf8e4ba0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h +++ b/cpp/src/qpid/broker/SimpleTxnPublish.h @@ -18,37 +18,36 @@ */ /** - * \file TxnPublish.h + * \file SimpleTxnPublish.h */ -#ifndef tests_storePerftools_asyncPerf_TxnPublish_h_ -#define tests_storePerftools_asyncPerf_TxnPublish_h_ +#ifndef qpid_broker_SimpleTxnPublish_h_ +#define qpid_broker_SimpleTxnPublish_h_ -#include "Deliverable.h" - -#include "qpid/broker/TxnOp.h" +#include "SimpleDeliverable.h" +#include "SimpleTxnOp.h" #include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> #include <list> -namespace tests { -namespace storePerftools { -namespace asyncPerf { -class QueuedMessage; +namespace qpid { +namespace broker { + +class SimpleQueuedMessage; class SimpleMessage; class SimpleQueue; -class TxnPublish : public qpid::broker::TxnOp, - public Deliverable +class SimpleTxnPublish : public SimpleTxnOp, + public SimpleDeliverable { public: - TxnPublish(boost::intrusive_ptr<SimpleMessage> msg); - virtual ~TxnPublish(); + SimpleTxnPublish(boost::intrusive_ptr<SimpleMessage> msg); + virtual ~SimpleTxnPublish(); // --- Interface TxOp --- - bool prepare(qpid::broker::TxnBuffer* tb) throw(); + bool prepare(SimpleTxnBuffer* tb) throw(); void commit() throw(); void rollback() throw(); @@ -59,10 +58,10 @@ public: private: boost::intrusive_ptr<SimpleMessage> m_msg; - std::list<boost::shared_ptr<QueuedMessage> > m_queues; - std::list<boost::shared_ptr<QueuedMessage> > m_prepared; + std::list<boost::shared_ptr<SimpleQueuedMessage> > m_queues; + std::list<boost::shared_ptr<SimpleQueuedMessage> > m_prepared; }; -}}} // namespace tests::storePerftools::asyncPerf +}} // namespace qpid::broker -#endif // tests_storePerftools_asyncPerf_TxnPublish_h_ +#endif // qpid_broker_SimpleTxnPublish_h_ diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp index 63e2de2b41..527cb4741f 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.cpp +++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp @@ -26,7 +26,7 @@ namespace qpid { namespace broker { -TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, +TxnAsyncContext::TxnAsyncContext(SimpleTxnBuffer* const tb, AsyncResultCallback rcb, AsyncResultQueue* const arq): m_tb(tb), @@ -37,7 +37,7 @@ TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, TxnAsyncContext::~TxnAsyncContext() {} -TxnBuffer* +SimpleTxnBuffer* TxnAsyncContext::getTxnBuffer() const { return m_tb; diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h index 9c617238e8..04f6ef76f5 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.h +++ b/cpp/src/qpid/broker/TxnAsyncContext.h @@ -29,38 +29,34 @@ #include "qpid/asyncStore/AsyncOperation.h" namespace qpid { -//namespace asyncStore { -//class AsyncOperation; -//} namespace broker { class AsyncResultHandle; class AsyncResultQueue; -//class TxnHandle; typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); class TxnAsyncContext: public BrokerAsyncContext { public: - TxnAsyncContext(TxnBuffer* const tb, + TxnAsyncContext(SimpleTxnBuffer* const tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); virtual ~TxnAsyncContext(); - TxnBuffer* getTxnBuffer() const; + SimpleTxnBuffer* getTxnBuffer() const; // --- Interface BrokerAsyncContext --- AsyncResultQueue* getAsyncResultQueue() const; void invokeCallback(const AsyncResultHandle* const) const; private: - TxnBuffer* const m_tb; + SimpleTxnBuffer* const m_tb; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; }; class TpcTxnAsyncContext : public TxnAsyncContext { public: - TpcTxnAsyncContext(TxnBuffer* const tb, + TpcTxnAsyncContext(SimpleTxnBuffer* const tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : TxnAsyncContext(tb, rcb, arq) diff --git a/cpp/src/tests/asyncstore.cmake b/cpp/src/tests/asyncstore.cmake index 94631bb8ea..3dcd81c3d7 100644 --- a/cpp/src/tests/asyncstore.cmake +++ b/cpp/src/tests/asyncstore.cmake @@ -51,20 +51,20 @@ endif (UNIX) # Async store perf test (asyncPerf) set (asyncStorePerf_SOURCES - storePerftools/asyncPerf/Deliverable.cpp - storePerftools/asyncPerf/DeliveryRecord.cpp - storePerftools/asyncPerf/MessageAsyncContext.cpp storePerftools/asyncPerf/MessageConsumer.cpp - storePerftools/asyncPerf/MessageDeque.cpp storePerftools/asyncPerf/MessageProducer.cpp storePerftools/asyncPerf/PerfTest.cpp - storePerftools/asyncPerf/QueuedMessage.cpp - storePerftools/asyncPerf/SimpleMessage.cpp - storePerftools/asyncPerf/SimpleQueue.cpp +# storePerftools/asyncPerf/SimpleDeliverable.cpp +# storePerftools/asyncPerf/SimpleDeliveryRecord.cpp +# storePerftools/asyncPerf/SimpleMessage.cpp +# storePerftools/asyncPerf/SimpleMessageAsyncContext.cpp +# storePerftools/asyncPerf/SimpleMessageDeque.cpp +# storePerftools/asyncPerf/SimpleQueue.cpp +# storePerftools/asyncPerf/SimpleQueuedMessage.cpp +# storePerftools/asyncPerf/SimpleTxnAccept.cpp +# storePerftools/asyncPerf/SimpleTxnPublish.cpp storePerftools/asyncPerf/TestOptions.cpp storePerftools/asyncPerf/TestResult.cpp - storePerftools/asyncPerf/TxnAccept.cpp - storePerftools/asyncPerf/TxnPublish.cpp storePerftools/common/Parameters.cpp storePerftools/common/PerftoolError.cpp diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 6aa477c470..6477696bd6 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -23,13 +23,12 @@ #include "MessageConsumer.h" -#include "DeliveryRecord.h" -#include "SimpleQueue.h" #include "TestOptions.h" -#include "TxnAccept.h" -#include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/TxnBuffer.h" +#include "qpid/broker/SimpleDeliveryRecord.h" +#include "qpid/broker/SimpleQueue.h" +#include "qpid/broker/SimpleTxnAccept.h" +#include "qpid/broker/SimpleTxnBuffer.h" #include <stdint.h> // uint32_t @@ -38,9 +37,9 @@ namespace storePerftools { namespace asyncPerf { MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr<SimpleQueue> queue) : + boost::shared_ptr<qpid::broker::SimpleQueue> queue) : m_perfTestParams(perfTestParams), m_store(store), m_resultQueue(arq), @@ -50,7 +49,7 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, MessageConsumer::~MessageConsumer() {} void -MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) { +MessageConsumer::record(boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> dr) { m_unacked.push_back(dr); } @@ -61,9 +60,9 @@ void* MessageConsumer::runConsumers() { const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U; uint16_t opsInTxnCnt = 0U; - qpid::broker::TxnBuffer* tb = 0; + qpid::broker::SimpleTxnBuffer* tb = 0; if (useTxns) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } uint32_t msgsPerConsumer = m_perfTestParams.m_numEnqThreadsPerQueue * m_perfTestParams.m_numMsgs / @@ -74,19 +73,19 @@ MessageConsumer::runConsumers() { ++numMsgs; if (useTxns) { // --- Transactional dequeue --- - boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked)); + boost::shared_ptr<qpid::broker::SimpleTxnAccept> ta(new qpid::broker::SimpleTxnAccept(m_unacked)); m_unacked.clear(); tb->enlist(ta); if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) { tb->commitLocal(m_store); if (numMsgs < m_perfTestParams.m_numMsgs) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } opsInTxnCnt = 0U; } } else { // --- Non-transactional dequeue --- - for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) { + for (std::deque<boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) { (*i)->accept(); } m_unacked.clear(); diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h index b110520889..d5a881f7e0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h @@ -24,44 +24,44 @@ #ifndef tests_storePerftools_asyncPerf_MessageConsumer_h_ #define tests_storePerftools_asyncPerf_MessageConsumer_h_ +#include "qpid/broker/SimpleConsumer.h" + #include "boost/shared_ptr.hpp" #include <deque> namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -} namespace broker { class AsyncResultQueue; +class AsyncStore; +class SimpleDeliveryRecord; +class SimpleQueue; }} namespace tests { namespace storePerftools { namespace asyncPerf { -class DeliveryRecord; -class SimpleQueue; class TestOptions; -class MessageConsumer +class MessageConsumer: public qpid::broker::SimpleConsumer { public: MessageConsumer(const TestOptions& perfTestParams, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr<SimpleQueue> queue); + boost::shared_ptr<qpid::broker::SimpleQueue> queue); virtual ~MessageConsumer(); - void record(boost::shared_ptr<DeliveryRecord> dr); + void record(boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> dr); void commitComplete(); void* runConsumers(); static void* startConsumers(void* ptr); private: const TestOptions& m_perfTestParams; - qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncStore* m_store; qpid::broker::AsyncResultQueue& m_resultQueue; - boost::shared_ptr<SimpleQueue> m_queue; - std::deque<boost::shared_ptr<DeliveryRecord> > m_unacked; + boost::shared_ptr<qpid::broker::SimpleQueue> m_queue; + std::deque<boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> > m_unacked; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index 974f3f3981..f88d305a38 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -23,13 +23,12 @@ #include "MessageProducer.h" -#include "SimpleMessage.h" -#include "SimpleQueue.h" #include "TestOptions.h" -#include "TxnPublish.h" -#include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/TxnBuffer.h" +#include "qpid/broker/SimpleMessage.h" +#include "qpid/broker/SimpleQueue.h" +#include "qpid/broker/SimpleTxnBuffer.h" +#include "qpid/broker/SimpleTxnPublish.h" #include <stdint.h> // uint32_t @@ -39,9 +38,9 @@ namespace asyncPerf { MessageProducer::MessageProducer(const TestOptions& perfTestParams, const char* msgData, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr<SimpleQueue> queue) : + boost::shared_ptr<qpid::broker::SimpleQueue> queue) : m_perfTestParams(perfTestParams), m_msgData(msgData), m_store(store), @@ -55,14 +54,14 @@ void* MessageProducer::runProducers() { const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U; uint16_t recsInTxnCnt = 0U; - qpid::broker::TxnBuffer* tb = 0; + qpid::broker::SimpleTxnBuffer* tb = 0; if (useTxns) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) { - boost::intrusive_ptr<SimpleMessage> msg(new SimpleMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); + boost::intrusive_ptr<qpid::broker::SimpleMessage> msg(new qpid::broker::SimpleMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); if (useTxns) { - boost::shared_ptr<TxnPublish> op(new TxnPublish(msg)); + boost::shared_ptr<qpid::broker::SimpleTxnPublish> op(new qpid::broker::SimpleTxnPublish(msg)); op->deliverTo(m_queue); tb->enlist(op); if (++recsInTxnCnt >= m_perfTestParams.m_enqTxnBlockSize) { @@ -72,7 +71,7 @@ MessageProducer::runProducers() { // transaction until the current commit cycle completes. So use another instance. This // instance should auto-delete when the async commit cycle completes. if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } recsInTxnCnt = 0U; } diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h index 127408e3db..6f98d03503 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h @@ -27,19 +27,17 @@ #include "boost/shared_ptr.hpp" namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -} namespace broker { class AsyncResultQueue; -class TxnBuffer; +class AsyncStore; +class SimpleQueue; +class SimpleTxnBuffer; }} namespace tests { namespace storePerftools { namespace asyncPerf { -class SimpleQueue; class TestOptions; class MessageProducer @@ -47,18 +45,18 @@ class MessageProducer public: MessageProducer(const TestOptions& perfTestParams, const char* msgData, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr<SimpleQueue> queue); + boost::shared_ptr<qpid::broker::SimpleQueue> queue); virtual ~MessageProducer(); void* runProducers(); static void* startProducers(void* ptr); private: const TestOptions& m_perfTestParams; const char* m_msgData; - qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncStore* m_store; qpid::broker::AsyncResultQueue& m_resultQueue; - boost::shared_ptr<SimpleQueue> m_queue; + boost::shared_ptr<qpid::broker::SimpleQueue> m_queue; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index 6377cc0d85..c05eb0487d 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -25,7 +25,6 @@ #include "MessageConsumer.h" #include "MessageProducer.h" -#include "SimpleQueue.h" #include "tests/storePerftools/version.h" #include "tests/storePerftools/common/ScopedTimer.h" @@ -34,6 +33,7 @@ #include "qpid/Modules.h" // Use with loading store as module #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/asyncStore/AsyncStoreOptions.h" +#include "qpid/broker/SimpleQueue.h" #include <iomanip> @@ -55,8 +55,7 @@ PerfTest::PerfTest(const TestOptions& to, std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize); } -PerfTest::~PerfTest() -{ +PerfTest::~PerfTest() { m_poller->shutdown(); m_pollingThread.join(); @@ -68,8 +67,7 @@ PerfTest::~PerfTest() } void -PerfTest::run() -{ +PerfTest::run() { if (m_testOpts.m_durable) { prepareStore(); } @@ -113,8 +111,7 @@ PerfTest::run() } void -PerfTest::toStream(std::ostream& os) const -{ +PerfTest::toStream(std::ostream& os) const { m_testOpts.printVals(os); os << std::endl; m_storeOpts.printVals(os); @@ -124,16 +121,15 @@ PerfTest::toStream(std::ostream& os) const // private void -PerfTest::prepareStore() -{ - m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts); - m_store->initialize(); +PerfTest::prepareStore() { + qpid::asyncStore::AsyncStoreImpl* s = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts); + s->initialize(); + m_store = s; } // private void -PerfTest::destroyStore() -{ +PerfTest::destroyStore() { if (m_store) { delete m_store; } @@ -141,12 +137,11 @@ PerfTest::destroyStore() // private void -PerfTest::prepareQueues() -{ +PerfTest::prepareQueues() { for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { std::ostringstream qname; qname << "queue_" << std::setw(4) << std::setfill('0') << i; - boost::shared_ptr<SimpleQueue> mpq(new SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); + boost::shared_ptr<qpid::broker::SimpleQueue> mpq(new qpid::broker::SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); mpq->asyncCreate(); m_queueList.push_back(mpq); } @@ -154,8 +149,7 @@ PerfTest::prepareQueues() // private void -PerfTest::destroyQueues() -{ +PerfTest::destroyQueues() { while (m_queueList.size() > 0) { m_queueList.front()->asyncDestroy(m_testOpts.m_destroyQueuesOnCompletion); m_queueList.pop_front(); @@ -163,8 +157,7 @@ PerfTest::destroyQueues() } int -runPerfTest(int argc, char** argv) -{ +runPerfTest(int argc, char** argv) { // Load async store module qpid::tryShlib ("asyncStore.so", false); @@ -212,7 +205,6 @@ runPerfTest(int argc, char** argv) // ----------------------------------------------------------------- int -main(int argc, char** argv) -{ +main(int argc, char** argv) { return tests::storePerftools::asyncPerf::runPerfTest(argc, argv); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h index e4d99021b5..27d8d08faf 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h @@ -37,9 +37,11 @@ namespace qpid { namespace asyncStore { -class AsyncStoreImpl; class AsyncStoreOptions; } +namespace broker { +class SimpleQueue; +} namespace sys { class Poller; }} @@ -48,7 +50,6 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimpleQueue; class MessageConsumer; class MessageProducer; class TestOptions; @@ -73,8 +74,8 @@ private: boost::shared_ptr<qpid::sys::Poller> m_poller; qpid::sys::Thread m_pollingThread; qpid::broker::AsyncResultQueueImpl m_resultQueue; - qpid::asyncStore::AsyncStoreImpl* m_store; - std::deque<boost::shared_ptr<SimpleQueue> > m_queueList; + qpid::broker::AsyncStore* m_store; + std::deque<boost::shared_ptr<qpid::broker::SimpleQueue> > m_queueList; std::deque<boost::shared_ptr<MessageProducer> > m_producers; std::deque<boost::shared_ptr<MessageConsumer> > m_consumers; diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp index 8c1f2976bf..20e9c39f1b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp @@ -62,12 +62,10 @@ TestOptions::TestOptions(const uint32_t numMsgs, doAddOptions(); } -TestOptions::~TestOptions() -{} +TestOptions::~TestOptions() {} void -TestOptions::printVals(std::ostream& os) const -{ +TestOptions::printVals(std::ostream& os) const { tests::storePerftools::common::TestOptions::printVals(os); os << " Num enqueus per transaction [-t, --enq-txn-size]: " << m_enqTxnBlockSize << std::endl; os << " Num dequeues per transaction [-d, --deq-txn-size]: " << m_deqTxnBlockSize << std::endl; @@ -77,8 +75,7 @@ TestOptions::printVals(std::ostream& os) const // private void -TestOptions::doAddOptions() -{ +TestOptions::doAddOptions() { addOptions() ("enq-txn-size,t", qpid::optValue(m_enqTxnBlockSize, "N"), "Num enqueus per transaction (0 = no transactions)") diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp index cf6f293494..312fa187b8 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp @@ -32,12 +32,10 @@ TestResult::TestResult(const TestOptions& to) : m_testOpts(to) {} -TestResult::~TestResult() -{} +TestResult::~TestResult() {} void -TestResult::toStream(std::ostream& os) const -{ +TestResult::toStream(std::ostream& os) const { double msgsRate; os << "TEST RESULTS:" << std::endl; os << " Msgs per thread: " << m_testOpts.m_numMsgs << std::endl; |