diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-01 14:05:21 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-01 14:05:21 +0000 |
commit | 80bfab9ed823cebd9f8f58b559fd32df108bcf7d (patch) | |
tree | 191bf724b9bf5b8394343d60ac4eac804e9c3d3a /cpp/src/tests | |
parent | 63c6598f401ac6406e5a31c602c7892b798536fc (diff) | |
download | qpid-python-80bfab9ed823cebd9f8f58b559fd32df108bcf7d.tar.gz |
QPID-3858: WIP: Moving Simple* test classes into the correct namespaces so as to correspond with broker classes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368006 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
28 files changed, 75 insertions, 1944 deletions
diff --git a/cpp/src/tests/asyncstore.cmake b/cpp/src/tests/asyncstore.cmake index 94631bb8ea..3dcd81c3d7 100644 --- a/cpp/src/tests/asyncstore.cmake +++ b/cpp/src/tests/asyncstore.cmake @@ -51,20 +51,20 @@ endif (UNIX) # Async store perf test (asyncPerf) set (asyncStorePerf_SOURCES - storePerftools/asyncPerf/Deliverable.cpp - storePerftools/asyncPerf/DeliveryRecord.cpp - storePerftools/asyncPerf/MessageAsyncContext.cpp storePerftools/asyncPerf/MessageConsumer.cpp - storePerftools/asyncPerf/MessageDeque.cpp storePerftools/asyncPerf/MessageProducer.cpp storePerftools/asyncPerf/PerfTest.cpp - storePerftools/asyncPerf/QueuedMessage.cpp - storePerftools/asyncPerf/SimpleMessage.cpp - storePerftools/asyncPerf/SimpleQueue.cpp +# storePerftools/asyncPerf/SimpleDeliverable.cpp +# storePerftools/asyncPerf/SimpleDeliveryRecord.cpp +# storePerftools/asyncPerf/SimpleMessage.cpp +# storePerftools/asyncPerf/SimpleMessageAsyncContext.cpp +# storePerftools/asyncPerf/SimpleMessageDeque.cpp +# storePerftools/asyncPerf/SimpleQueue.cpp +# storePerftools/asyncPerf/SimpleQueuedMessage.cpp +# storePerftools/asyncPerf/SimpleTxnAccept.cpp +# storePerftools/asyncPerf/SimpleTxnPublish.cpp storePerftools/asyncPerf/TestOptions.cpp storePerftools/asyncPerf/TestResult.cpp - storePerftools/asyncPerf/TxnAccept.cpp - storePerftools/asyncPerf/TxnPublish.cpp storePerftools/common/Parameters.cpp storePerftools/common/PerftoolError.cpp diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp deleted file mode 100644 index 9da7e348e0..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Deliverable.cpp - */ - -#include "Deliverable.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -Deliverable::Deliverable() : - m_delivered(false) -{} - -Deliverable::~Deliverable() -{} - -bool -Deliverable::isDelivered() const -{ - return m_delivered; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h deleted file mode 100644 index 990d53a199..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Deliverable.h - */ - -#ifndef tests_storePerftools_asyncPerf_Deliverable_h_ -#define tests_storePerftools_asyncPerf_Deliverable_h_ - -#include <boost/shared_ptr.hpp> -#include <stdint.h> // uint64_t - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class SimpleMessage; -class SimpleQueue; - -class Deliverable -{ -public: - Deliverable(); - virtual ~Deliverable(); - - virtual uint64_t contentSize() = 0; - virtual void deliverTo(const boost::shared_ptr<SimpleQueue>& queue) = 0; - virtual SimpleMessage& getMessage() = 0; - virtual bool isDelivered() const; - -protected: - bool m_delivered; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_Deliverable_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp deleted file mode 100644 index 6f33369a26..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file DeliveryRecord.cpp - */ - -#include "DeliveryRecord.h" - -#include "MessageConsumer.h" -#include "QueuedMessage.h" -#include "SimpleMessage.h" -#include "SimpleQueue.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -DeliveryRecord::DeliveryRecord(boost::shared_ptr<QueuedMessage> qm, - MessageConsumer& mc, - bool accepted) : - m_queuedMessage(qm), - m_msgConsumer(mc), - m_accepted(accepted), - m_ended(accepted) -{} - -DeliveryRecord::~DeliveryRecord() -{} - -bool -DeliveryRecord::accept() -{ - if (!m_ended) { - m_queuedMessage->getQueue()->dequeue(m_queuedMessage); - m_accepted = true; - setEnded(); - } - return isRedundant(); -} - -bool -DeliveryRecord::isAccepted() const -{ - return m_accepted; -} - -bool -DeliveryRecord::setEnded() -{ - m_ended = true; - m_queuedMessage->payload() = boost::intrusive_ptr<SimpleMessage>(0); - return isRedundant(); -} - -bool -DeliveryRecord::isEnded() const -{ - return m_ended; -} - -bool -DeliveryRecord::isRedundant() const -{ - return m_ended; -} - -void -DeliveryRecord::dequeue(qpid::broker::TxnBuffer* tb) -{ - m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage); -} - -void -DeliveryRecord::committed() const -{ - m_msgConsumer.commitComplete(); -} - -boost::shared_ptr<QueuedMessage> -DeliveryRecord::getQueuedMessage() const -{ - return m_queuedMessage; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h deleted file mode 100644 index 6c5d87f374..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file DeliveryRecord.h - */ - -#ifndef tests_storePerftools_asyncPerf_DeliveryRecord_h_ -#define tests_storePerftools_asyncPerf_DeliveryRecord_h_ - -//#include "QueuedMessage.h" - -#include <boost/shared_ptr.hpp> - -namespace qpid { -namespace broker { -class TxnBuffer; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class MessageConsumer; -class QueuedMessage; - -class DeliveryRecord { -public: - DeliveryRecord(boost::shared_ptr<QueuedMessage> qm, - MessageConsumer& mc, - bool accepted); - virtual ~DeliveryRecord(); - bool accept(); - bool isAccepted() const; - bool setEnded(); - bool isEnded() const; - bool isRedundant() const; - void dequeue(qpid::broker::TxnBuffer* tb); - void committed() const; - boost::shared_ptr<QueuedMessage> getQueuedMessage() const; -private: - boost::shared_ptr<QueuedMessage> m_queuedMessage; - MessageConsumer& m_msgConsumer; - bool m_accepted : 1; - bool m_ended : 1; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_DeliveryRecord_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp deleted file mode 100644 index e3bfe9ae7a..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MessageContext.cpp - */ - -#include "MessageAsyncContext.h" - -#include "SimpleMessage.h" - -#include <cassert> - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, - boost::shared_ptr<SimpleQueue> q) : - m_msg(msg), - m_q(q) -{ - assert(m_msg.get() != 0); - assert(m_q.get() != 0); -} - -MessageAsyncContext::~MessageAsyncContext() -{} - -boost::intrusive_ptr<SimpleMessage> -MessageAsyncContext::getMessage() const -{ - return m_msg; -} - -boost::shared_ptr<SimpleQueue> -MessageAsyncContext::getQueue() const -{ - return m_q; -} - -void -MessageAsyncContext::destroy() -{ - delete this; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h deleted file mode 100644 index 9252fbda45..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MessageContext.h - */ - -#ifndef tests_storePerfTools_asyncPerf_MessageContext_h_ -#define tests_storePerfTools_asyncPerf_MessageContext_h_ - -#include "qpid/asyncStore/AsyncOperation.h" - -#include <boost/intrusive_ptr.hpp> -#include <boost/shared_ptr.hpp> - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class SimpleMessage; -class SimpleQueue; - -class MessageAsyncContext : public qpid::broker::BrokerAsyncContext -{ -public: - MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg, - boost::shared_ptr<SimpleQueue> q); - virtual ~MessageAsyncContext(); - boost::intrusive_ptr<SimpleMessage> getMessage() const; - boost::shared_ptr<SimpleQueue> getQueue() const; - void destroy(); - -private: - boost::intrusive_ptr<SimpleMessage> m_msg; - boost::shared_ptr<SimpleQueue> m_q; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerfTools_asyncPerf_MessageContext_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 6aa477c470..6477696bd6 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -23,13 +23,12 @@ #include "MessageConsumer.h" -#include "DeliveryRecord.h" -#include "SimpleQueue.h" #include "TestOptions.h" -#include "TxnAccept.h" -#include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/TxnBuffer.h" +#include "qpid/broker/SimpleDeliveryRecord.h" +#include "qpid/broker/SimpleQueue.h" +#include "qpid/broker/SimpleTxnAccept.h" +#include "qpid/broker/SimpleTxnBuffer.h" #include <stdint.h> // uint32_t @@ -38,9 +37,9 @@ namespace storePerftools { namespace asyncPerf { MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr<SimpleQueue> queue) : + boost::shared_ptr<qpid::broker::SimpleQueue> queue) : m_perfTestParams(perfTestParams), m_store(store), m_resultQueue(arq), @@ -50,7 +49,7 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, MessageConsumer::~MessageConsumer() {} void -MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) { +MessageConsumer::record(boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> dr) { m_unacked.push_back(dr); } @@ -61,9 +60,9 @@ void* MessageConsumer::runConsumers() { const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U; uint16_t opsInTxnCnt = 0U; - qpid::broker::TxnBuffer* tb = 0; + qpid::broker::SimpleTxnBuffer* tb = 0; if (useTxns) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } uint32_t msgsPerConsumer = m_perfTestParams.m_numEnqThreadsPerQueue * m_perfTestParams.m_numMsgs / @@ -74,19 +73,19 @@ MessageConsumer::runConsumers() { ++numMsgs; if (useTxns) { // --- Transactional dequeue --- - boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked)); + boost::shared_ptr<qpid::broker::SimpleTxnAccept> ta(new qpid::broker::SimpleTxnAccept(m_unacked)); m_unacked.clear(); tb->enlist(ta); if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) { tb->commitLocal(m_store); if (numMsgs < m_perfTestParams.m_numMsgs) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } opsInTxnCnt = 0U; } } else { // --- Non-transactional dequeue --- - for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) { + for (std::deque<boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) { (*i)->accept(); } m_unacked.clear(); diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h index b110520889..d5a881f7e0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h @@ -24,44 +24,44 @@ #ifndef tests_storePerftools_asyncPerf_MessageConsumer_h_ #define tests_storePerftools_asyncPerf_MessageConsumer_h_ +#include "qpid/broker/SimpleConsumer.h" + #include "boost/shared_ptr.hpp" #include <deque> namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -} namespace broker { class AsyncResultQueue; +class AsyncStore; +class SimpleDeliveryRecord; +class SimpleQueue; }} namespace tests { namespace storePerftools { namespace asyncPerf { -class DeliveryRecord; -class SimpleQueue; class TestOptions; -class MessageConsumer +class MessageConsumer: public qpid::broker::SimpleConsumer { public: MessageConsumer(const TestOptions& perfTestParams, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr<SimpleQueue> queue); + boost::shared_ptr<qpid::broker::SimpleQueue> queue); virtual ~MessageConsumer(); - void record(boost::shared_ptr<DeliveryRecord> dr); + void record(boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> dr); void commitComplete(); void* runConsumers(); static void* startConsumers(void* ptr); private: const TestOptions& m_perfTestParams; - qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncStore* m_store; qpid::broker::AsyncResultQueue& m_resultQueue; - boost::shared_ptr<SimpleQueue> m_queue; - std::deque<boost::shared_ptr<DeliveryRecord> > m_unacked; + boost::shared_ptr<qpid::broker::SimpleQueue> m_queue; + std::deque<boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> > m_unacked; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp deleted file mode 100644 index 1fa2c087ac..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MessageDeque.cpp - */ - -#include "MessageDeque.h" - -#include "QueuedMessage.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -MessageDeque::MessageDeque() -{} - -MessageDeque::~MessageDeque() -{} - -uint32_t -MessageDeque::size() -{ - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); - return m_messages.size(); -} - -bool -MessageDeque::push(boost::shared_ptr<QueuedMessage>& added) -{ - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); - m_messages.push_back(added); - return false; -} - -bool -MessageDeque::consume(boost::shared_ptr<QueuedMessage>& msg) -{ - qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex); - if (!m_messages.empty()) { - msg = m_messages.front(); - m_messages.pop_front(); - return true; - } - return false; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h deleted file mode 100644 index 021015f3e0..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MessageDeque.h - */ - -/* - * This is a copy of qpid::broker::MessageDeque.h, but using the local - * tests::storePerftools::asyncPerf::QueuedMessage class instead of - * qpid::broker::QueuedMessage. - */ - -#ifndef tests_storePerftools_asyncPerf_MessageDeque_h_ -#define tests_storePerftools_asyncPerf_MessageDeque_h_ - -#include "Messages.h" - -#include "qpid/sys/Mutex.h" - -#include <deque> - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class MessageDeque : public Messages -{ -public: - MessageDeque(); - virtual ~MessageDeque(); - uint32_t size(); - bool push(boost::shared_ptr<QueuedMessage>& added); - bool consume(boost::shared_ptr<QueuedMessage>& msg); -private: - std::deque<boost::shared_ptr<QueuedMessage> > m_messages; - qpid::sys::Mutex m_msgMutex; - -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_MessageDeque_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index 974f3f3981..f88d305a38 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -23,13 +23,12 @@ #include "MessageProducer.h" -#include "SimpleMessage.h" -#include "SimpleQueue.h" #include "TestOptions.h" -#include "TxnPublish.h" -#include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/TxnBuffer.h" +#include "qpid/broker/SimpleMessage.h" +#include "qpid/broker/SimpleQueue.h" +#include "qpid/broker/SimpleTxnBuffer.h" +#include "qpid/broker/SimpleTxnPublish.h" #include <stdint.h> // uint32_t @@ -39,9 +38,9 @@ namespace asyncPerf { MessageProducer::MessageProducer(const TestOptions& perfTestParams, const char* msgData, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr<SimpleQueue> queue) : + boost::shared_ptr<qpid::broker::SimpleQueue> queue) : m_perfTestParams(perfTestParams), m_msgData(msgData), m_store(store), @@ -55,14 +54,14 @@ void* MessageProducer::runProducers() { const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U; uint16_t recsInTxnCnt = 0U; - qpid::broker::TxnBuffer* tb = 0; + qpid::broker::SimpleTxnBuffer* tb = 0; if (useTxns) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) { - boost::intrusive_ptr<SimpleMessage> msg(new SimpleMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); + boost::intrusive_ptr<qpid::broker::SimpleMessage> msg(new qpid::broker::SimpleMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); if (useTxns) { - boost::shared_ptr<TxnPublish> op(new TxnPublish(msg)); + boost::shared_ptr<qpid::broker::SimpleTxnPublish> op(new qpid::broker::SimpleTxnPublish(msg)); op->deliverTo(m_queue); tb->enlist(op); if (++recsInTxnCnt >= m_perfTestParams.m_enqTxnBlockSize) { @@ -72,7 +71,7 @@ MessageProducer::runProducers() { // transaction until the current commit cycle completes. So use another instance. This // instance should auto-delete when the async commit cycle completes. if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } recsInTxnCnt = 0U; } diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h index 127408e3db..6f98d03503 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h @@ -27,19 +27,17 @@ #include "boost/shared_ptr.hpp" namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -} namespace broker { class AsyncResultQueue; -class TxnBuffer; +class AsyncStore; +class SimpleQueue; +class SimpleTxnBuffer; }} namespace tests { namespace storePerftools { namespace asyncPerf { -class SimpleQueue; class TestOptions; class MessageProducer @@ -47,18 +45,18 @@ class MessageProducer public: MessageProducer(const TestOptions& perfTestParams, const char* msgData, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr<SimpleQueue> queue); + boost::shared_ptr<qpid::broker::SimpleQueue> queue); virtual ~MessageProducer(); void* runProducers(); static void* startProducers(void* ptr); private: const TestOptions& m_perfTestParams; const char* m_msgData; - qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncStore* m_store; qpid::broker::AsyncResultQueue& m_resultQueue; - boost::shared_ptr<SimpleQueue> m_queue; + boost::shared_ptr<qpid::broker::SimpleQueue> m_queue; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/Messages.h b/cpp/src/tests/storePerftools/asyncPerf/Messages.h deleted file mode 100644 index c1bfa328ea..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/Messages.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Messages.h - */ - -/* - * This is a copy of qpid::broker::Messages.h, but using the local - * tests::storePerftools::asyncPerf::QueuedMessage class instead of - * qpid::broker::QueuedMessage. - */ - -#ifndef tests_storePerftools_asyncPerf_Messages_h_ -#define tests_storePerftools_asyncPerf_Messages_h_ - -#include <boost/shared_ptr.hpp> -#include <stdint.h> - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class QueuedMessage; - -class Messages -{ -public: - virtual ~Messages() {} - virtual uint32_t size() = 0; - virtual bool push(boost::shared_ptr<QueuedMessage>& added) = 0; - virtual bool consume(boost::shared_ptr<QueuedMessage>& msg) = 0; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_Messages_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index 6377cc0d85..c05eb0487d 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -25,7 +25,6 @@ #include "MessageConsumer.h" #include "MessageProducer.h" -#include "SimpleQueue.h" #include "tests/storePerftools/version.h" #include "tests/storePerftools/common/ScopedTimer.h" @@ -34,6 +33,7 @@ #include "qpid/Modules.h" // Use with loading store as module #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/asyncStore/AsyncStoreOptions.h" +#include "qpid/broker/SimpleQueue.h" #include <iomanip> @@ -55,8 +55,7 @@ PerfTest::PerfTest(const TestOptions& to, std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize); } -PerfTest::~PerfTest() -{ +PerfTest::~PerfTest() { m_poller->shutdown(); m_pollingThread.join(); @@ -68,8 +67,7 @@ PerfTest::~PerfTest() } void -PerfTest::run() -{ +PerfTest::run() { if (m_testOpts.m_durable) { prepareStore(); } @@ -113,8 +111,7 @@ PerfTest::run() } void -PerfTest::toStream(std::ostream& os) const -{ +PerfTest::toStream(std::ostream& os) const { m_testOpts.printVals(os); os << std::endl; m_storeOpts.printVals(os); @@ -124,16 +121,15 @@ PerfTest::toStream(std::ostream& os) const // private void -PerfTest::prepareStore() -{ - m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts); - m_store->initialize(); +PerfTest::prepareStore() { + qpid::asyncStore::AsyncStoreImpl* s = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts); + s->initialize(); + m_store = s; } // private void -PerfTest::destroyStore() -{ +PerfTest::destroyStore() { if (m_store) { delete m_store; } @@ -141,12 +137,11 @@ PerfTest::destroyStore() // private void -PerfTest::prepareQueues() -{ +PerfTest::prepareQueues() { for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { std::ostringstream qname; qname << "queue_" << std::setw(4) << std::setfill('0') << i; - boost::shared_ptr<SimpleQueue> mpq(new SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); + boost::shared_ptr<qpid::broker::SimpleQueue> mpq(new qpid::broker::SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); mpq->asyncCreate(); m_queueList.push_back(mpq); } @@ -154,8 +149,7 @@ PerfTest::prepareQueues() // private void -PerfTest::destroyQueues() -{ +PerfTest::destroyQueues() { while (m_queueList.size() > 0) { m_queueList.front()->asyncDestroy(m_testOpts.m_destroyQueuesOnCompletion); m_queueList.pop_front(); @@ -163,8 +157,7 @@ PerfTest::destroyQueues() } int -runPerfTest(int argc, char** argv) -{ +runPerfTest(int argc, char** argv) { // Load async store module qpid::tryShlib ("asyncStore.so", false); @@ -212,7 +205,6 @@ runPerfTest(int argc, char** argv) // ----------------------------------------------------------------- int -main(int argc, char** argv) -{ +main(int argc, char** argv) { return tests::storePerftools::asyncPerf::runPerfTest(argc, argv); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h index e4d99021b5..27d8d08faf 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h @@ -37,9 +37,11 @@ namespace qpid { namespace asyncStore { -class AsyncStoreImpl; class AsyncStoreOptions; } +namespace broker { +class SimpleQueue; +} namespace sys { class Poller; }} @@ -48,7 +50,6 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimpleQueue; class MessageConsumer; class MessageProducer; class TestOptions; @@ -73,8 +74,8 @@ private: boost::shared_ptr<qpid::sys::Poller> m_poller; qpid::sys::Thread m_pollingThread; qpid::broker::AsyncResultQueueImpl m_resultQueue; - qpid::asyncStore::AsyncStoreImpl* m_store; - std::deque<boost::shared_ptr<SimpleQueue> > m_queueList; + qpid::broker::AsyncStore* m_store; + std::deque<boost::shared_ptr<qpid::broker::SimpleQueue> > m_queueList; std::deque<boost::shared_ptr<MessageProducer> > m_producers; std::deque<boost::shared_ptr<MessageConsumer> > m_consumers; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp deleted file mode 100644 index 0d16248c7f..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file QueuedMessage.cpp - */ - -#include "QueuedMessage.h" - -#include "SimpleMessage.h" -#include "SimpleQueue.h" - -#include "qpid/asyncStore/AsyncStoreImpl.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -QueuedMessage::QueuedMessage() : - m_queue(0) -{} - -QueuedMessage::QueuedMessage(SimpleQueue* q, - boost::intrusive_ptr<SimpleMessage> msg) : - boost::enable_shared_from_this<QueuedMessage>(), - m_queue(q), - m_msg(msg) -{ - if (m_queue->getStore()) { - m_enqHandle = q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()); - } -} - -QueuedMessage::QueuedMessage(const QueuedMessage& qm) : - boost::enable_shared_from_this<QueuedMessage>(), - m_queue(qm.m_queue), - m_msg(qm.m_msg), - m_enqHandle(qm.m_enqHandle) -{} - -QueuedMessage::QueuedMessage(QueuedMessage* const qm) : - boost::enable_shared_from_this<QueuedMessage>(), - m_queue(qm->m_queue), - m_msg(qm->m_msg), - m_enqHandle(qm->m_enqHandle) -{} - -QueuedMessage::~QueuedMessage() -{} - -SimpleQueue* -QueuedMessage::getQueue() const -{ - return m_queue; -} - -boost::intrusive_ptr<SimpleMessage> -QueuedMessage::payload() const -{ - return m_msg; -} - -const qpid::broker::EnqueueHandle& -QueuedMessage::enqHandle() const -{ - return m_enqHandle; -} - -qpid::broker::EnqueueHandle& -QueuedMessage::enqHandle() -{ - return m_enqHandle; -} - -void -QueuedMessage::prepareEnqueue(qpid::broker::TxnBuffer* tb) -{ - m_queue->enqueue(tb, shared_from_this()); -} - -void -QueuedMessage::commitEnqueue() -{ - m_queue->process(m_msg); -} - -void -QueuedMessage::abortEnqueue() -{ - m_queue->enqueueAborted(m_msg); -} - -}}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h deleted file mode 100644 index 630fe1aedc..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file QueuedMessage.h - */ - -#ifndef tests_storePerftools_asyncPerf_QueuedMessage_h_ -#define tests_storePerftools_asyncPerf_QueuedMessage_h_ - -#include "qpid/broker/AsyncStore.h" -#include "qpid/broker/EnqueueHandle.h" - -#include <boost/enable_shared_from_this.hpp> -#include <boost/intrusive_ptr.hpp> - -namespace qpid { -namespace broker { - -class TxnHandle; - -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class SimpleMessage; -class SimpleQueue; - -class QueuedMessage : public boost::enable_shared_from_this<QueuedMessage> -{ -public: - QueuedMessage(); - QueuedMessage(SimpleQueue* q, - boost::intrusive_ptr<SimpleMessage> msg); - QueuedMessage(const QueuedMessage& qm); - QueuedMessage(QueuedMessage* const qm); - virtual ~QueuedMessage(); - SimpleQueue* getQueue() const; - boost::intrusive_ptr<SimpleMessage> payload() const; - const qpid::broker::EnqueueHandle& enqHandle() const; - qpid::broker::EnqueueHandle& enqHandle(); - - // --- Transaction handling --- - void prepareEnqueue(qpid::broker::TxnBuffer* tb); - void commitEnqueue(); - void abortEnqueue(); - -private: - SimpleQueue* m_queue; - boost::intrusive_ptr<SimpleMessage> m_msg; - qpid::broker::EnqueueHandle m_enqHandle; -}; - -}}} // namespace tests::storePerfTools - -#endif // tests_storePerftools_asyncPerf_QueuedMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp deleted file mode 100644 index bacf438b9f..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimpleMessage.cpp - */ - -#include "SimpleMessage.h" - -#include <string.h> // memcpy() - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -SimpleMessage::SimpleMessage(const char* msgData, - const uint32_t msgSize) : - m_persistenceId(0ULL), - m_msg(msgData, static_cast<size_t>(msgSize)), - m_store(0), - m_msgHandle(qpid::broker::MessageHandle()) -{} - -SimpleMessage::SimpleMessage(const char* msgData, - const uint32_t msgSize, - qpid::broker::AsyncStore* store) : - m_persistenceId(0ULL), - m_msg(msgData, static_cast<size_t>(msgSize)), - m_store(store), - m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle()) -{} - -SimpleMessage::~SimpleMessage() -{} - -const qpid::broker::MessageHandle& -SimpleMessage::getHandle() const -{ - return m_msgHandle; -} - -qpid::broker::MessageHandle& -SimpleMessage::getHandle() -{ - return m_msgHandle; -} - -uint64_t -SimpleMessage::contentSize() const -{ - return static_cast<uint64_t>(m_msg.size()); -} - -void -SimpleMessage::setPersistenceId(uint64_t id) const -{ - m_persistenceId = id; -} - -uint64_t -SimpleMessage::getPersistenceId() const -{ - return m_persistenceId; -} - -void -SimpleMessage::encode(qpid::framing::Buffer& buffer) const -{ - buffer.putRawData(m_msg); -} - -uint32_t -SimpleMessage::encodedSize() const -{ - return static_cast<uint32_t>(m_msg.size()); -} - -void -SimpleMessage::allDequeuesComplete() -{} - -uint32_t -SimpleMessage::encodedHeaderSize() const -{ - return 0; -} - -bool -SimpleMessage::isPersistent() const -{ - return m_store != 0; -} - -uint64_t -SimpleMessage::getSize() -{ - return m_msg.size(); -} - -void -SimpleMessage::write(char* target) -{ - ::memcpy(target, m_msg.data(), m_msg.size()); -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h deleted file mode 100644 index 169f5a8959..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimpleMessage.h - */ - -#ifndef tests_storePerftools_asyncPerf_SimpleMessage_h_ -#define tests_storePerftools_asyncPerf_SimpleMessage_h_ - -#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource -#include "qpid/broker/MessageHandle.h" -#include "qpid/broker/PersistableMessage.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class SimpleMessage: public qpid::broker::PersistableMessage, - public qpid::broker::DataSource -{ -public: - SimpleMessage(const char* msgData, - const uint32_t msgSize); - SimpleMessage(const char* msgData, - const uint32_t msgSize, - qpid::broker::AsyncStore* store); - virtual ~SimpleMessage(); - const qpid::broker::MessageHandle& getHandle() const; - qpid::broker::MessageHandle& getHandle(); - uint64_t contentSize() const; - - // --- Interface Persistable --- - virtual void setPersistenceId(uint64_t id) const; - virtual uint64_t getPersistenceId() const; - virtual void encode(qpid::framing::Buffer& buffer) const; - virtual uint32_t encodedSize() const; - - // --- Interface PersistableMessage --- - virtual void allDequeuesComplete(); - virtual uint32_t encodedHeaderSize() const; - virtual bool isPersistent() const; - - // --- Interface DataSource --- - virtual uint64_t getSize(); // <- same as encodedSize()? - virtual void write(char* target); - -private: - mutable uint64_t m_persistenceId; - const std::string m_msg; - qpid::broker::AsyncStore* m_store; - - qpid::broker::MessageHandle m_msgHandle; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_SimpleMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp deleted file mode 100644 index 06b4e9333f..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ /dev/null @@ -1,457 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimpleQueue.cpp - */ - -#include "SimpleQueue.h" - -#include "DeliveryRecord.h" -#include "MessageConsumer.h" -#include "MessageDeque.h" -#include "QueuedMessage.h" -#include "SimpleMessage.h" - -#include "qpid/broker/AsyncResultHandle.h" -#include "qpid/broker/QueueAsyncContext.h" -#include "qpid/broker/TxnBuffer.h" - -#include <string.h> // memcpy() - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -//static -qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations - - -SimpleQueue::SimpleQueue(const std::string& name, - const qpid::framing::FieldTable& /*args*/, - qpid::broker::AsyncStore* store, - qpid::broker::AsyncResultQueue& arq) : - qpid::broker::PersistableQueue(), - m_name(name), - m_store(store), - m_resultQueue(arq), - m_asyncOpCounter(0UL), - m_persistenceId(0ULL), - m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. - m_destroyPending(false), - m_destroyed(false), - m_barrier(*this), - m_messages(new MessageDeque()) -{ - if (m_store != 0) { - const qpid::types::Variant::Map qo; - m_queueHandle = m_store->createQueueHandle(m_name, qo); - } -} - -SimpleQueue::~SimpleQueue() {} - -const qpid::broker::QueueHandle& -SimpleQueue::getHandle() const { - return m_queueHandle; -} - -qpid::broker::QueueHandle& -SimpleQueue::getHandle() { - return m_queueHandle; -} - -qpid::broker::AsyncStore* -SimpleQueue::getStore() { - return m_store; -} - -void -SimpleQueue::asyncCreate() { - if (m_store) { - boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - &handleAsyncCreateResult, - &m_resultQueue)); - m_store->submitCreate(m_queueHandle, this, qac); - ++m_asyncOpCounter; - } -} - -//static -void -SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh) { - if (arh) { - boost::shared_ptr<qpid::broker::QueueAsyncContext> qc = - boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext()); - boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); - if (arh->getErrNo()) { - // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; - } else { - sq->createComplete(qc); - } - } -} - -void -SimpleQueue::asyncDestroy(const bool deleteQueue) -{ - m_destroyPending = true; - if (m_store) { - if (deleteQueue) { - boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - &handleAsyncDestroyResult, - &m_resultQueue)); - m_store->submitDestroy(m_queueHandle, qac); - ++m_asyncOpCounter; - } - m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); - } -} - -//static -void -SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh) { - if (arh) { - boost::shared_ptr<qpid::broker::QueueAsyncContext> qc = - boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext()); - boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); - if (arh->getErrNo()) { - // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; - } else { - sq->destroyComplete(qc); - } - } -} - -void -SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) { - boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg))); - enqueue(qm); - push(qm); -} - -bool -SimpleQueue::dispatch(MessageConsumer& mc) { - boost::shared_ptr<QueuedMessage> qm; - if (m_messages->consume(qm)) { - boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false)); - mc.record(dr); - return true; - } - return false; -} - -bool -SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) { - return enqueue(0, qm); -} - -bool -SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm) { - ScopedUse u(m_barrier); - if (!u.m_acquired) { - return false; - } - if (qm->payload()->isPersistent() && m_store) { - qm->payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(tb, qm); - } - return false; -} - -bool -SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) { - return dequeue(0, qm); -} - -bool -SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm) { - ScopedUse u(m_barrier); - if (!u.m_acquired) { - return false; - } - if (qm->payload()->isPersistent() && m_store) { - qm->payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(tb, qm); - } - return true; -} - -void -SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) { - push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg))); -} - -void -SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage>) {} - -void -SimpleQueue::encode(qpid::framing::Buffer& buffer) const { - buffer.putShortString(m_name); -} - -uint32_t -SimpleQueue::encodedSize() const { - return m_name.size() + 1; -} - -uint64_t -SimpleQueue::getPersistenceId() const { - return m_persistenceId; -} - -void -SimpleQueue::setPersistenceId(uint64_t persistenceId) const { - m_persistenceId = persistenceId; -} - -void -SimpleQueue::flush() { - //if(m_store) m_store->flush(*this); -} - -const std::string& -SimpleQueue::getName() const { - return m_name; -} - -void -SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) { - if (externalQueueStore != inst && externalQueueStore) - delete externalQueueStore; - externalQueueStore = inst; -} - -uint64_t -SimpleQueue::getSize() { - return m_persistableData.size(); -} - -void -SimpleQueue::write(char* target) { - ::memcpy(target, m_persistableData.data(), m_persistableData.size()); -} - -// --- Members & methods in msg handling path from qpid::Queue --- - -// protected -SimpleQueue::UsageBarrier::UsageBarrier(SimpleQueue& q) : - m_parent(q), - m_count(0) -{} - -// protected -bool -SimpleQueue::UsageBarrier::acquire() -{ - qpid::sys::Monitor::ScopedLock l(m_monitor); - if (m_parent.m_destroyed) { - return false; - } else { - ++m_count; - return true; - } -} - -// protected -void SimpleQueue::UsageBarrier::release() -{ - qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); - if (--m_count == 0) { - m_monitor.notifyAll(); - } -} - -// protected -void SimpleQueue::UsageBarrier::destroy() -{ - qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); - m_parent.m_destroyed = true; - while (m_count) { - m_monitor.wait(); - } -} - -// protected -SimpleQueue::ScopedUse::ScopedUse(UsageBarrier& b) : - m_barrier(b), - m_acquired(m_barrier.acquire()) -{} - -// protected -SimpleQueue::ScopedUse::~ScopedUse() -{ - if (m_acquired) { - m_barrier.release(); - } -} - -// private -void -SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm, - bool /*isRecovery*/) -{ - m_messages->push(qm); -} - -// --- End Members & methods in msg handling path from qpid::Queue --- - -// private -bool -SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm) { - assert(qm.get()); - boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - qm->payload(), - tb, - &handleAsyncEnqueueResult, - &m_resultQueue)); - if (tb) { - tb->incrOpCnt(); - m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac); - } else { - m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac); - } - ++m_asyncOpCounter; - return true; -} - -// private static -void -SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh) { - if (arh) { - boost::shared_ptr<qpid::broker::QueueAsyncContext> qc = - boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext()); - boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); - if (arh->getErrNo()) { - // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; - } else { - sq->enqueueComplete(qc); - } - } -} - -// private -bool -SimpleQueue::asyncDequeue(/*boost::shared_ptr<qpid::broker::TxnBuffer>*/qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm) -{ - assert(qm.get()); - boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - qm->payload(), - tb, - &handleAsyncDequeueResult, - &m_resultQueue)); - if (tb) { - tb->incrOpCnt(); - m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac); - } else { - m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac); - } - ++m_asyncOpCounter; - return true; -} -// private static -void -SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh) { - if (arh) { - boost::shared_ptr<qpid::broker::QueueAsyncContext> qc = - boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext()); - boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue()); - if (arh->getErrNo()) { - // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; - } else { - sq->dequeueComplete(qc); - } - } -} - -// private -void -SimpleQueue::destroyCheck(const std::string& opDescr) const { - if (m_destroyPending || m_destroyed) { - std::ostringstream oss; - oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; - throw qpid::Exception(oss.str()); - } -} - -// private -void -SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { - if (qc.get()) { - assert(qc->getQueue().get() == this); - } - --m_asyncOpCounter; -} - -// private -void -SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { - if (qc.get()) { - assert(qc->getQueue().get() == this); - } - --m_asyncOpCounter; -} - -// private -void -SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { - if (qc.get()) { - assert(qc->getQueue().get() == this); - } - --m_asyncOpCounter; - m_destroyed = true; -} - -// private -void -SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { - if (qc.get()) { - assert(qc->getQueue().get() == this); - if (qc->getTxnBuffer()) { // transactional enqueue - qc->getTxnBuffer()->decrOpCnt(); - } - } - --m_asyncOpCounter; -} - -// private -void -SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) { - if (qc.get()) { - assert(qc->getQueue().get() == this); - if (qc->getTxnBuffer()) { // transactional enqueue - qc->getTxnBuffer()->decrOpCnt(); - } - } - --m_asyncOpCounter; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h deleted file mode 100644 index 5f64c9b960..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimpleQueue.h - */ - -#ifndef tests_storePerftools_asyncPerf_SimpleQueue_h_ -#define tests_storePerftools_asyncPerf_SimpleQueue_h_ - -#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter -#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource -#include "qpid/broker/PersistableQueue.h" -#include "qpid/broker/QueueHandle.h" -#include "qpid/sys/Monitor.h" - -#include <boost/intrusive_ptr.hpp> -#include <boost/shared_ptr.hpp> -#include <boost/enable_shared_from_this.hpp> - -namespace qpid { -namespace broker { -class AsyncResultQueue; -class QueueAsyncContext; -class TxnBuffer; -} -namespace framing { -class FieldTable; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class MessageConsumer; -class Messages; -class QueuedMessage; -class SimpleMessage; - -class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>, - public qpid::broker::PersistableQueue, - public qpid::broker::DataSource -{ -public: - SimpleQueue(const std::string& name, - const qpid::framing::FieldTable& args, - qpid::broker::AsyncStore* store, - qpid::broker::AsyncResultQueue& arq); - virtual ~SimpleQueue(); - - const qpid::broker::QueueHandle& getHandle() const; - qpid::broker::QueueHandle& getHandle(); - qpid::broker::AsyncStore* getStore(); - - void asyncCreate(); - static void handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh); - void asyncDestroy(const bool deleteQueue); - static void handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh); - - // --- Methods in msg handling path from qpid::Queue --- - void deliver(boost::intrusive_ptr<SimpleMessage> msg); - bool dispatch(MessageConsumer& mc); - bool enqueue(boost::shared_ptr<QueuedMessage> qm); - bool enqueue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm); - bool dequeue(boost::shared_ptr<QueuedMessage> qm); - bool dequeue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm); - void process(boost::intrusive_ptr<SimpleMessage> msg); - void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg); - - // --- Interface qpid::broker::Persistable --- - virtual void encode(qpid::framing::Buffer& buffer) const; - virtual uint32_t encodedSize() const; - virtual uint64_t getPersistenceId() const; - virtual void setPersistenceId(uint64_t persistenceId) const; - - // --- Interface qpid::broker::PersistableQueue --- - virtual void flush(); - virtual const std::string& getName() const; - virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst); - - // --- Interface qpid::broker::DataStore --- - virtual uint64_t getSize(); - virtual void write(char* target); - -private: - static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations - - const std::string m_name; - qpid::broker::AsyncStore* m_store; - qpid::broker::AsyncResultQueue& m_resultQueue; - qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; // TODO: change this to non-async store counter! - mutable uint64_t m_persistenceId; - std::string m_persistableData; - qpid::broker::QueueHandle m_queueHandle; - bool m_destroyPending; - bool m_destroyed; - - // --- Members & methods in msg handling path copied from qpid::Queue --- - struct UsageBarrier { - SimpleQueue& m_parent; - uint32_t m_count; - qpid::sys::Monitor m_monitor; - UsageBarrier(SimpleQueue& q); - bool acquire(); - void release(); - void destroy(); - }; - struct ScopedUse { - UsageBarrier& m_barrier; - const bool m_acquired; - ScopedUse(UsageBarrier& b); - ~ScopedUse(); - }; - UsageBarrier m_barrier; - std::auto_ptr<Messages> m_messages; - void push(boost::shared_ptr<QueuedMessage> qm, - bool isRecovery = false); - - // -- Async ops --- - bool asyncEnqueue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm); - static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh); - bool asyncDequeue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr<QueuedMessage> qm); - static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh); - - // --- Async op counter --- - void destroyCheck(const std::string& opDescr) const; - - // --- Async op completions (called through handleAsyncResult) --- - void createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc); - void flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc); - void destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc); - void enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc); - void dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc); -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_SimpleQueue_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp index 8c1f2976bf..20e9c39f1b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp @@ -62,12 +62,10 @@ TestOptions::TestOptions(const uint32_t numMsgs, doAddOptions(); } -TestOptions::~TestOptions() -{} +TestOptions::~TestOptions() {} void -TestOptions::printVals(std::ostream& os) const -{ +TestOptions::printVals(std::ostream& os) const { tests::storePerftools::common::TestOptions::printVals(os); os << " Num enqueus per transaction [-t, --enq-txn-size]: " << m_enqTxnBlockSize << std::endl; os << " Num dequeues per transaction [-d, --deq-txn-size]: " << m_deqTxnBlockSize << std::endl; @@ -77,8 +75,7 @@ TestOptions::printVals(std::ostream& os) const // private void -TestOptions::doAddOptions() -{ +TestOptions::doAddOptions() { addOptions() ("enq-txn-size,t", qpid::optValue(m_enqTxnBlockSize, "N"), "Num enqueus per transaction (0 = no transactions)") diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp index cf6f293494..312fa187b8 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp @@ -32,12 +32,10 @@ TestResult::TestResult(const TestOptions& to) : m_testOpts(to) {} -TestResult::~TestResult() -{} +TestResult::~TestResult() {} void -TestResult::toStream(std::ostream& os) const -{ +TestResult::toStream(std::ostream& os) const { double msgsRate; os << "TEST RESULTS:" << std::endl; os << " Msgs per thread: " << m_testOpts.m_numMsgs << std::endl; diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp deleted file mode 100644 index 375cd568d2..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnAccept.cpp - */ - -#include "TxnAccept.h" - -#include "DeliveryRecord.h" - -#include "qpid/log/Statement.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -TxnAccept::TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops) : - m_ops(ops) -{} - -TxnAccept::~TxnAccept() {} - -// --- Interface TxnOp --- - -bool -TxnAccept::prepare(qpid::broker::TxnBuffer* tb) throw() { - try { - for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { - (*i)->dequeue(tb); - } - return true; - } catch (const std::exception& e) { - QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what()); - } catch (...) { - QPID_LOG(error, "TxnAccept: Failed to prepare transaction: (unknown error)"); - } - return false; -} - -void -TxnAccept::commit() throw() { - try { - for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) { - (*i)->committed(); - (*i)->setEnded(); - } - } catch (const std::exception& e) { - QPID_LOG(error, "TxnAccept: Failed to commit transaction: " << e.what()); - } catch(...) { - QPID_LOG(error, "TxnAccept: Failed to commit transaction: (unknown error)"); - } -} - -void -TxnAccept::rollback() throw() {} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h deleted file mode 100644 index 5d84289965..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnAccept.h - */ - -#ifndef tests_storePerftools_asyncPerf_TxnAccept_h_ -#define tests_storePerftools_asyncPerf_TxnAccept_h_ - -#include "qpid/broker/TxnOp.h" - -#include "boost/shared_ptr.hpp" -#include <deque> - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class DeliveryRecord; - -class TxnAccept: public qpid::broker::TxnOp { -public: - TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops); - virtual ~TxnAccept(); - - // --- Interface TxnOp --- - bool prepare(qpid::broker::TxnBuffer* tb) throw(); - void commit() throw(); - void rollback() throw(); -private: - std::deque<boost::shared_ptr<DeliveryRecord> > m_ops; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_TxnAccept_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp deleted file mode 100644 index cc36a38be7..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnPublish.cpp - */ - -#include "TxnPublish.h" - -#include "QueuedMessage.h" -#include "SimpleMessage.h" -#include "SimpleQueue.h" - -#include "qpid/log/Statement.h" -#include <boost/make_shared.hpp> - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) : - m_msg(msg) -{} - -TxnPublish::~TxnPublish() {} - -bool -TxnPublish::prepare(qpid::broker::TxnBuffer* tb) throw() { - try { - while (!m_queues.empty()) { - m_queues.front()->prepareEnqueue(tb); - m_prepared.push_back(m_queues.front()); - m_queues.pop_front(); - } - return true; - } catch (const std::exception& e) { - QPID_LOG(error, "TxnPublish: Failed to prepare transaction: " << e.what()); - } catch (...) { - QPID_LOG(error, "TxnPublish: Failed to prepare transaction: (unknown error)"); - } - return false; -} - -void -TxnPublish::commit() throw() { - try { - for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { - (*i)->commitEnqueue(); - } - } catch (const std::exception& e) { - QPID_LOG(error, "TxnPublish: Failed to commit transaction: " << e.what()); - } catch (...) { - QPID_LOG(error, "TxnPublish: Failed to commit transaction: (unknown error)"); - } -} - -void -TxnPublish::rollback() throw() { - try { - for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { - (*i)->abortEnqueue(); - } - } catch (const std::exception& e) { - QPID_LOG(error, "TxnPublish: Failed to rollback transaction: " << e.what()); - } catch (...) { - QPID_LOG(error, "TxnPublish: Failed to rollback transaction: (unknown error)"); - } -} - -uint64_t -TxnPublish::contentSize() { - return m_msg->contentSize(); -} - -void -TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) { - m_queues.push_back(boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg))); - m_delivered = true; -} - -SimpleMessage& -TxnPublish::getMessage() { - return *m_msg; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h deleted file mode 100644 index eae9ef9c4c..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnPublish.h - */ - -#ifndef tests_storePerftools_asyncPerf_TxnPublish_h_ -#define tests_storePerftools_asyncPerf_TxnPublish_h_ - -#include "Deliverable.h" - -#include "qpid/broker/TxnOp.h" - -#include <boost/intrusive_ptr.hpp> -#include <boost/shared_ptr.hpp> -#include <list> - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class QueuedMessage; -class SimpleMessage; -class SimpleQueue; - -class TxnPublish : public qpid::broker::TxnOp, - public Deliverable -{ -public: - TxnPublish(boost::intrusive_ptr<SimpleMessage> msg); - virtual ~TxnPublish(); - - // --- Interface TxOp --- - bool prepare(qpid::broker::TxnBuffer* tb) throw(); - void commit() throw(); - void rollback() throw(); - - // --- Interface Deliverable --- - uint64_t contentSize(); - void deliverTo(const boost::shared_ptr<SimpleQueue>& queue); - SimpleMessage& getMessage(); - -private: - boost::intrusive_ptr<SimpleMessage> m_msg; - std::list<boost::shared_ptr<QueuedMessage> > m_queues; - std::list<boost::shared_ptr<QueuedMessage> > m_prepared; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_TxnPublish_h_ |