From 80bfab9ed823cebd9f8f58b559fd32df108bcf7d Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Wed, 1 Aug 2012 14:05:21 +0000 Subject: QPID-3858: WIP: Moving Simple* test classes into the correct namespaces so as to correspond with broker classes. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368006 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/asyncstore.cmake | 11 +- cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 4 +- cpp/src/qpid/asyncStore/AsyncStoreImpl.h | 7 +- cpp/src/qpid/asyncStore/TxnHandleImpl.cpp | 4 +- cpp/src/qpid/asyncStore/TxnHandleImpl.h | 8 +- cpp/src/qpid/broker/AsyncStore.h | 8 +- cpp/src/qpid/broker/QueueAsyncContext.cpp | 6 +- cpp/src/qpid/broker/QueueAsyncContext.h | 8 +- cpp/src/qpid/broker/SimpleConsumer.h | 42 ++ cpp/src/qpid/broker/SimpleDeliverable.cpp | 40 ++ cpp/src/qpid/broker/SimpleDeliverable.h | 53 +++ cpp/src/qpid/broker/SimpleDeliveryRecord.cpp | 92 +++++ cpp/src/qpid/broker/SimpleDeliveryRecord.h | 59 +++ cpp/src/qpid/broker/SimpleMessage.cpp | 108 +++++ cpp/src/qpid/broker/SimpleMessage.h | 73 ++++ cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp | 59 +++ cpp/src/qpid/broker/SimpleMessageAsyncContext.h | 55 +++ cpp/src/qpid/broker/SimpleMessageDeque.cpp | 59 +++ cpp/src/qpid/broker/SimpleMessageDeque.h | 57 +++ cpp/src/qpid/broker/SimpleMessages.h | 52 +++ cpp/src/qpid/broker/SimpleQueue.cpp | 448 ++++++++++++++++++++ cpp/src/qpid/broker/SimpleQueue.h | 155 +++++++ cpp/src/qpid/broker/SimpleQueuedMessage.cpp | 98 +++++ cpp/src/qpid/broker/SimpleQueuedMessage.h | 66 +++ cpp/src/qpid/broker/SimpleTxnAccept.cpp | 73 ++++ cpp/src/qpid/broker/SimpleTxnAccept.h | 52 +++ cpp/src/qpid/broker/SimpleTxnBuffer.cpp | 254 ++++++++++++ cpp/src/qpid/broker/SimpleTxnBuffer.h | 89 ++++ cpp/src/qpid/broker/SimpleTxnOp.h | 44 ++ cpp/src/qpid/broker/SimpleTxnPublish.cpp | 101 +++++ cpp/src/qpid/broker/SimpleTxnPublish.h | 67 +++ cpp/src/qpid/broker/TxnAsyncContext.cpp | 4 +- cpp/src/qpid/broker/TxnAsyncContext.h | 12 +- cpp/src/qpid/broker/TxnBuffer.cpp | 254 ------------ cpp/src/qpid/broker/TxnBuffer.h | 89 ---- cpp/src/qpid/broker/TxnOp.h | 44 -- cpp/src/tests/asyncstore.cmake | 18 +- .../tests/storePerftools/asyncPerf/Deliverable.cpp | 43 -- .../tests/storePerftools/asyncPerf/Deliverable.h | 54 --- .../storePerftools/asyncPerf/DeliveryRecord.cpp | 102 ----- .../storePerftools/asyncPerf/DeliveryRecord.h | 66 --- .../asyncPerf/MessageAsyncContext.cpp | 64 --- .../storePerftools/asyncPerf/MessageAsyncContext.h | 56 --- .../storePerftools/asyncPerf/MessageConsumer.cpp | 25 +- .../storePerftools/asyncPerf/MessageConsumer.h | 24 +- .../storePerftools/asyncPerf/MessageDeque.cpp | 65 --- .../tests/storePerftools/asyncPerf/MessageDeque.h | 59 --- .../storePerftools/asyncPerf/MessageProducer.cpp | 23 +- .../storePerftools/asyncPerf/MessageProducer.h | 16 +- cpp/src/tests/storePerftools/asyncPerf/Messages.h | 53 --- .../tests/storePerftools/asyncPerf/PerfTest.cpp | 36 +- cpp/src/tests/storePerftools/asyncPerf/PerfTest.h | 9 +- .../storePerftools/asyncPerf/QueuedMessage.cpp | 109 ----- .../tests/storePerftools/asyncPerf/QueuedMessage.h | 74 ---- .../storePerftools/asyncPerf/SimpleMessage.cpp | 122 ------ .../tests/storePerftools/asyncPerf/SimpleMessage.h | 74 ---- .../tests/storePerftools/asyncPerf/SimpleQueue.cpp | 457 --------------------- .../tests/storePerftools/asyncPerf/SimpleQueue.h | 158 ------- .../tests/storePerftools/asyncPerf/TestOptions.cpp | 9 +- .../tests/storePerftools/asyncPerf/TestResult.cpp | 6 +- .../tests/storePerftools/asyncPerf/TxnAccept.cpp | 74 ---- cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h | 53 --- .../tests/storePerftools/asyncPerf/TxnPublish.cpp | 102 ----- .../tests/storePerftools/asyncPerf/TxnPublish.h | 68 --- 64 files changed, 2309 insertions(+), 2365 deletions(-) create mode 100644 cpp/src/qpid/broker/SimpleConsumer.h create mode 100644 cpp/src/qpid/broker/SimpleDeliverable.cpp create mode 100644 cpp/src/qpid/broker/SimpleDeliverable.h create mode 100644 cpp/src/qpid/broker/SimpleDeliveryRecord.cpp create mode 100644 cpp/src/qpid/broker/SimpleDeliveryRecord.h create mode 100644 cpp/src/qpid/broker/SimpleMessage.cpp create mode 100644 cpp/src/qpid/broker/SimpleMessage.h create mode 100644 cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp create mode 100644 cpp/src/qpid/broker/SimpleMessageAsyncContext.h create mode 100644 cpp/src/qpid/broker/SimpleMessageDeque.cpp create mode 100644 cpp/src/qpid/broker/SimpleMessageDeque.h create mode 100644 cpp/src/qpid/broker/SimpleMessages.h create mode 100644 cpp/src/qpid/broker/SimpleQueue.cpp create mode 100644 cpp/src/qpid/broker/SimpleQueue.h create mode 100644 cpp/src/qpid/broker/SimpleQueuedMessage.cpp create mode 100644 cpp/src/qpid/broker/SimpleQueuedMessage.h create mode 100644 cpp/src/qpid/broker/SimpleTxnAccept.cpp create mode 100644 cpp/src/qpid/broker/SimpleTxnAccept.h create mode 100644 cpp/src/qpid/broker/SimpleTxnBuffer.cpp create mode 100644 cpp/src/qpid/broker/SimpleTxnBuffer.h create mode 100644 cpp/src/qpid/broker/SimpleTxnOp.h create mode 100644 cpp/src/qpid/broker/SimpleTxnPublish.cpp create mode 100644 cpp/src/qpid/broker/SimpleTxnPublish.h delete mode 100644 cpp/src/qpid/broker/TxnBuffer.cpp delete mode 100644 cpp/src/qpid/broker/TxnBuffer.h delete mode 100644 cpp/src/qpid/broker/TxnOp.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/Deliverable.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/Messages.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp delete mode 100644 cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h diff --git a/cpp/src/asyncstore.cmake b/cpp/src/asyncstore.cmake index 5656bc8cc4..6171ed5505 100644 --- a/cpp/src/asyncstore.cmake +++ b/cpp/src/asyncstore.cmake @@ -64,8 +64,17 @@ set (asyncStore_SOURCES qpid/broker/MessageHandle.cpp qpid/broker/QueueAsyncContext.cpp qpid/broker/QueueHandle.cpp + qpid/broker/SimpleDeliverable.cpp + qpid/broker/SimpleDeliveryRecord.cpp + qpid/broker/SimpleMessage.cpp + qpid/broker/SimpleMessageAsyncContext.cpp + qpid/broker/SimpleMessageDeque.cpp + qpid/broker/SimpleQueue.cpp + qpid/broker/SimpleQueuedMessage.cpp + qpid/broker/SimpleTxnAccept.cpp + qpid/broker/SimpleTxnBuffer.cpp + qpid/broker/SimpleTxnPublish.cpp qpid/broker/TxnAsyncContext.cpp - qpid/broker/TxnBuffer.cpp qpid/broker/TxnHandle.cpp ) diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index aa66e7adb8..2ee1d23025 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -75,7 +75,7 @@ AsyncStoreImpl::createTxnHandle() } qpid::broker::TxnHandle -AsyncStoreImpl::createTxnHandle(qpid::broker::TxnBuffer* tb) +AsyncStoreImpl::createTxnHandle(qpid::broker::SimpleTxnBuffer* tb) { return qpid::broker::TxnHandle(new TxnHandleImpl(tb)); } @@ -90,7 +90,7 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid, qpid::broker::TxnHandle AsyncStoreImpl::createTxnHandle(const std::string& xid, const bool tpcFlag, - qpid::broker::TxnBuffer* tb) + qpid::broker::SimpleTxnBuffer* tb) { return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tpcFlag, tb)); } diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index eb3f090ad7..40a7552a68 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -42,8 +42,7 @@ class Poller; namespace asyncStore { -class AsyncStoreImpl : public qpid::broker::AsyncTransactionalStore, - public qpid::broker::AsyncStore +class AsyncStoreImpl : public qpid::broker::AsyncStore { public: AsyncStoreImpl(boost::shared_ptr poller, @@ -59,12 +58,12 @@ public: // --- Interface from AsyncTransactionalStore --- qpid::broker::TxnHandle createTxnHandle(); - qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb); + qpid::broker::TxnHandle createTxnHandle(qpid::broker::SimpleTxnBuffer* tb); qpid::broker::TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag); qpid::broker::TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag, - qpid::broker::TxnBuffer* tb); + qpid::broker::SimpleTxnBuffer* tb); void submitPrepare(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr TxnCtxt); diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp index dd644b29bd..50dce1b2af 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp @@ -31,7 +31,7 @@ TxnHandleImpl::TxnHandleImpl() : m_txnBuffer(0) {} -TxnHandleImpl::TxnHandleImpl(qpid::broker::TxnBuffer* tb) : +TxnHandleImpl::TxnHandleImpl(qpid::broker::SimpleTxnBuffer* tb) : m_tpcFlag(false), m_txnBuffer(tb) {} @@ -44,7 +44,7 @@ TxnHandleImpl::TxnHandleImpl(const std::string& xid, const bool tpcFlag) : TxnHandleImpl::TxnHandleImpl(const std::string& xid, const bool tpcFlag, - qpid::broker::TxnBuffer* tb) : + qpid::broker::SimpleTxnBuffer* tb) : m_xid(xid), m_tpcFlag(tpcFlag), m_txnBuffer(tb) diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h index e1f8afff3e..ce23665d5b 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h @@ -33,7 +33,7 @@ namespace qpid { namespace broker { -class TxnBuffer; +class SimpleTxnBuffer; } namespace asyncStore { @@ -42,9 +42,9 @@ class TxnHandleImpl : public virtual qpid::RefCounted { public: TxnHandleImpl(); - TxnHandleImpl(qpid::broker::TxnBuffer* tb); + TxnHandleImpl(qpid::broker::SimpleTxnBuffer* tb); TxnHandleImpl(const std::string& xid, const bool tpcFlag); - TxnHandleImpl(const std::string& xid, const bool tpcFlag, qpid::broker::TxnBuffer* tb); + TxnHandleImpl(const std::string& xid, const bool tpcFlag, qpid::broker::SimpleTxnBuffer* tb); virtual ~TxnHandleImpl(); const std::string& getXid() const; bool is2pc() const; @@ -52,7 +52,7 @@ public: private: std::string m_xid; bool m_tpcFlag; - qpid::broker::TxnBuffer* const m_txnBuffer; + qpid::broker::SimpleTxnBuffer* const m_txnBuffer; }; }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 6f1c02e059..7009565a7c 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -70,19 +70,19 @@ class TxnHandle; class QueueAsyncContext; class TpcTxnAsyncContext; class TxnAsyncContext; -class TxnBuffer; +class SimpleTxnBuffer; class AsyncTransactionalStore { public: virtual ~AsyncTransactionalStore() {} virtual TxnHandle createTxnHandle() = 0; - virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0; + virtual TxnHandle createTxnHandle(SimpleTxnBuffer* tb) = 0; virtual TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag) = 0; virtual TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag, - TxnBuffer* tb) = 0; + SimpleTxnBuffer* tb) = 0; virtual void submitPrepare(TxnHandle&, boost::shared_ptr) = 0; // Distributed txns only @@ -94,7 +94,7 @@ public: }; // Subclassed by store: -class AsyncStore { +class AsyncStore : public AsyncTransactionalStore { public: virtual ~AsyncStore() {} diff --git a/cpp/src/qpid/broker/QueueAsyncContext.cpp b/cpp/src/qpid/broker/QueueAsyncContext.cpp index 4bd2d271eb..02eb2e9546 100644 --- a/cpp/src/qpid/broker/QueueAsyncContext.cpp +++ b/cpp/src/qpid/broker/QueueAsyncContext.cpp @@ -48,7 +48,7 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, {} QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), @@ -61,7 +61,7 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, boost::intrusive_ptr msg, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), @@ -89,7 +89,7 @@ QueueAsyncContext::getMessage() const return m_msg; } -TxnBuffer* +SimpleTxnBuffer* QueueAsyncContext::getTxnBuffer() const { return m_tb; } diff --git a/cpp/src/qpid/broker/QueueAsyncContext.h b/cpp/src/qpid/broker/QueueAsyncContext.h index e9ba2ebbac..8657922377 100644 --- a/cpp/src/qpid/broker/QueueAsyncContext.h +++ b/cpp/src/qpid/broker/QueueAsyncContext.h @@ -52,18 +52,18 @@ public: AsyncResultCallback rcb, AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr q, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr q, boost::intrusive_ptr msg, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); virtual ~QueueAsyncContext(); boost::shared_ptr getQueue() const; boost::intrusive_ptr getMessage() const; - TxnBuffer* getTxnBuffer() const; + SimpleTxnBuffer* getTxnBuffer() const; AsyncResultQueue* getAsyncResultQueue() const; AsyncResultCallback getAsyncResultCallback() const; void invokeCallback(const AsyncResultHandle* const arh) const; @@ -72,7 +72,7 @@ public: private: boost::shared_ptr m_q; boost::intrusive_ptr m_msg; - TxnBuffer* m_tb; + SimpleTxnBuffer* m_tb; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; }; diff --git a/cpp/src/qpid/broker/SimpleConsumer.h b/cpp/src/qpid/broker/SimpleConsumer.h new file mode 100644 index 0000000000..6601c65a42 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleConsumer.h @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleConsumer.h + */ + +#ifndef qpid_broker_SimpleConsumer_h_ +#define qpid_broker_SimpleConsumer_h_ + +#include + +namespace qpid { +namespace broker { +class SimpleDeliveryRecord; + +class SimpleConsumer { +public: + virtual ~SimpleConsumer() {} + virtual void commitComplete() = 0; + virtual void record(boost::shared_ptr dr) = 0; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleConsumer_h_ diff --git a/cpp/src/qpid/broker/SimpleDeliverable.cpp b/cpp/src/qpid/broker/SimpleDeliverable.cpp new file mode 100644 index 0000000000..7037a377c5 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleDeliverable.cpp @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleDeliverable.cpp + */ + +#include "SimpleDeliverable.h" + +namespace qpid { +namespace broker { + +SimpleDeliverable::SimpleDeliverable() : + m_delivered(false) +{} + +SimpleDeliverable::~SimpleDeliverable() {} + +bool +SimpleDeliverable::isDelivered() const { + return m_delivered; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleDeliverable.h b/cpp/src/qpid/broker/SimpleDeliverable.h new file mode 100644 index 0000000000..6441e14841 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleDeliverable.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleDeliverable.h + */ + +#ifndef qpid_broker_SimpleDeliverable_h_ +#define qpid_broker_SimpleDeliverable_h_ + +#include +#include // uint64_t + +namespace qpid { +namespace broker { + +class SimpleMessage; +class SimpleQueue; + +class SimpleDeliverable +{ +public: + SimpleDeliverable(); + virtual ~SimpleDeliverable(); + + virtual uint64_t contentSize() = 0; + virtual void deliverTo(const boost::shared_ptr& queue) = 0; + virtual SimpleMessage& getMessage() = 0; + virtual bool isDelivered() const; + +protected: + bool m_delivered; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleDeliverable_h_ diff --git a/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp b/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp new file mode 100644 index 0000000000..b71df6975b --- /dev/null +++ b/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleDeliveryRecord.cpp + */ + +#include "SimpleDeliveryRecord.h" + +#include "SimpleConsumer.h" +#include "SimpleMessage.h" +#include "SimpleQueue.h" +#include "SimpleQueuedMessage.h" + +namespace qpid { +namespace broker { + +SimpleDeliveryRecord::SimpleDeliveryRecord(boost::shared_ptr qm, + SimpleConsumer& sc, + bool accepted) : + m_queuedMessage(qm), + m_msgConsumer(sc), + m_accepted(accepted), + m_ended(accepted) +{} + +SimpleDeliveryRecord::~SimpleDeliveryRecord() {} + +bool +SimpleDeliveryRecord::accept() { + if (!m_ended) { + m_queuedMessage->getQueue()->dequeue(m_queuedMessage); + m_accepted = true; + setEnded(); + } + return isRedundant(); +} + +bool +SimpleDeliveryRecord::isAccepted() const { + return m_accepted; +} + +bool +SimpleDeliveryRecord::setEnded() { + m_ended = true; + m_queuedMessage->payload() = boost::intrusive_ptr(0); + return isRedundant(); +} + +bool +SimpleDeliveryRecord::isEnded() const { + return m_ended; +} + +bool +SimpleDeliveryRecord::isRedundant() const { + return m_ended; +} + +void +SimpleDeliveryRecord::dequeue(qpid::broker::SimpleTxnBuffer* tb) { + m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage); +} + +void +SimpleDeliveryRecord::committed() const { + m_msgConsumer.commitComplete(); +} + +boost::shared_ptr +SimpleDeliveryRecord::getQueuedMessage() const { + return m_queuedMessage; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleDeliveryRecord.h b/cpp/src/qpid/broker/SimpleDeliveryRecord.h new file mode 100644 index 0000000000..622ce578d7 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleDeliveryRecord.h @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleDeliveryRecord.h + */ + +#ifndef qpid_broker_SimpleDeliveryRecord_h_ +#define qpid_broker_SimpleDeliveryRecord_h_ + +#include + +namespace qpid { +namespace broker { + +class SimpleConsumer; +class SimpleQueuedMessage; +class SimpleTxnBuffer; + +class SimpleDeliveryRecord { +public: + SimpleDeliveryRecord(boost::shared_ptr qm, + SimpleConsumer& sc, + bool accepted); + virtual ~SimpleDeliveryRecord(); + bool accept(); + bool isAccepted() const; + bool setEnded(); + bool isEnded() const; + bool isRedundant() const; + void dequeue(qpid::broker::SimpleTxnBuffer* tb); + void committed() const; + boost::shared_ptr getQueuedMessage() const; +private: + boost::shared_ptr m_queuedMessage; + SimpleConsumer& m_msgConsumer; + bool m_accepted : 1; + bool m_ended : 1; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleDeliveryRecord_h_ diff --git a/cpp/src/qpid/broker/SimpleMessage.cpp b/cpp/src/qpid/broker/SimpleMessage.cpp new file mode 100644 index 0000000000..1239533edf --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessage.cpp @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessage.cpp + */ + +#include "SimpleMessage.h" + +#include // memcpy() + +namespace qpid { +namespace broker { + +SimpleMessage::SimpleMessage(const char* msgData, + const uint32_t msgSize) : + m_persistenceId(0ULL), + m_msg(msgData, static_cast(msgSize)), + m_store(0), + m_msgHandle(MessageHandle()) +{} + +SimpleMessage::SimpleMessage(const char* msgData, + const uint32_t msgSize, + AsyncStore* store) : + m_persistenceId(0ULL), + m_msg(msgData, static_cast(msgSize)), + m_store(store), + m_msgHandle(store ? store->createMessageHandle(this) : MessageHandle()) +{} + +SimpleMessage::~SimpleMessage() {} + +const MessageHandle& +SimpleMessage::getHandle() const { + return m_msgHandle; +} + +MessageHandle& +SimpleMessage::getHandle() { + return m_msgHandle; +} + +uint64_t +SimpleMessage::contentSize() const { + return static_cast(m_msg.size()); +} + +void +SimpleMessage::setPersistenceId(uint64_t id) const { + m_persistenceId = id; +} + +uint64_t +SimpleMessage::getPersistenceId() const { + return m_persistenceId; +} + +void +SimpleMessage::encode(qpid::framing::Buffer& buffer) const { + buffer.putRawData(m_msg); +} + +uint32_t +SimpleMessage::encodedSize() const { + return static_cast(m_msg.size()); +} + +void +SimpleMessage::allDequeuesComplete() {} + +uint32_t +SimpleMessage::encodedHeaderSize() const { + return 0; +} + +bool +SimpleMessage::isPersistent() const { + return m_store != 0; +} + +uint64_t +SimpleMessage::getSize() { + return m_msg.size(); +} + +void +SimpleMessage::write(char* target) { + ::memcpy(target, m_msg.data(), m_msg.size()); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleMessage.h b/cpp/src/qpid/broker/SimpleMessage.h new file mode 100644 index 0000000000..edfaa8d13b --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessage.h @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessage.h + */ + +#ifndef qpid_broker_SimpleMessage_h_ +#define qpid_broker_SimpleMessage_h_ + +#include "AsyncStore.h" // DataSource +#include "MessageHandle.h" +#include "PersistableMessage.h" + +namespace qpid { +namespace broker { + +class SimpleMessage: public PersistableMessage, + public DataSource +{ +public: + SimpleMessage(const char* msgData, + const uint32_t msgSize); + SimpleMessage(const char* msgData, + const uint32_t msgSize, + AsyncStore* store); + virtual ~SimpleMessage(); + const MessageHandle& getHandle() const; + MessageHandle& getHandle(); + uint64_t contentSize() const; + + // --- Interface Persistable --- + virtual void setPersistenceId(uint64_t id) const; + virtual uint64_t getPersistenceId() const; + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + + // --- Interface PersistableMessage --- + virtual void allDequeuesComplete(); + virtual uint32_t encodedHeaderSize() const; + virtual bool isPersistent() const; + + // --- Interface DataSource --- + virtual uint64_t getSize(); // <- same as encodedSize()? + virtual void write(char* target); + +private: + mutable uint64_t m_persistenceId; + const std::string m_msg; + AsyncStore* m_store; + + MessageHandle m_msgHandle; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleMessage_h_ diff --git a/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp b/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp new file mode 100644 index 0000000000..a88258f5bc --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessageAsyncContext.cpp + */ + +#include "SimpleMessageAsyncContext.h" + +#include "SimpleMessage.h" + +#include + +namespace qpid { +namespace broker { + +SimpleMessageAsyncContext::SimpleMessageAsyncContext(boost::intrusive_ptr msg, + boost::shared_ptr q) : + m_msg(msg), + m_q(q) +{ + assert(m_msg.get() != 0); + assert(m_q.get() != 0); +} + +SimpleMessageAsyncContext::~SimpleMessageAsyncContext() {} + +boost::intrusive_ptr +SimpleMessageAsyncContext::getMessage() const { + return m_msg; +} + +boost::shared_ptr +SimpleMessageAsyncContext::getQueue() const { + return m_q; +} + +void +SimpleMessageAsyncContext::destroy() { + delete this; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleMessageAsyncContext.h b/cpp/src/qpid/broker/SimpleMessageAsyncContext.h new file mode 100644 index 0000000000..e3975e790e --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessageAsyncContext.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessageAsyncContext.h + */ + +#ifndef qpid_broker_SimpleMessageAsyncContext_h_ +#define qpid_broker_SimpleMessageAsyncContext_h_ + +#include "AsyncStore.h" // BrokerAsyncContext + +#include +#include + +namespace qpid { +namespace broker { + +class SimpleMessage; +class SimpleQueue; + +class SimpleMessageAsyncContext : public BrokerAsyncContext +{ +public: + SimpleMessageAsyncContext(boost::intrusive_ptr msg, + boost::shared_ptr q); + virtual ~SimpleMessageAsyncContext(); + boost::intrusive_ptr getMessage() const; + boost::shared_ptr getQueue() const; + void destroy(); + +private: + boost::intrusive_ptr m_msg; + boost::shared_ptr m_q; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleMessageAsyncContext_h_ diff --git a/cpp/src/qpid/broker/SimpleMessageDeque.cpp b/cpp/src/qpid/broker/SimpleMessageDeque.cpp new file mode 100644 index 0000000000..0aadcfd94a --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessageDeque.cpp @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessageDeque.cpp + */ + +#include "SimpleMessageDeque.h" + +#include "SimpleQueuedMessage.h" + +namespace qpid { +namespace broker { + +SimpleMessageDeque::SimpleMessageDeque() {} + +SimpleMessageDeque::~SimpleMessageDeque() {} + +uint32_t +SimpleMessageDeque::size() { + qpid::sys::ScopedLock l(m_msgMutex); + return m_messages.size(); +} + +bool +SimpleMessageDeque::push(boost::shared_ptr& added) { + qpid::sys::ScopedLock l(m_msgMutex); + m_messages.push_back(added); + return false; +} + +bool +SimpleMessageDeque::consume(boost::shared_ptr& msg) { + qpid::sys::ScopedLock l(m_msgMutex); + if (!m_messages.empty()) { + msg = m_messages.front(); + m_messages.pop_front(); + return true; + } + return false; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleMessageDeque.h b/cpp/src/qpid/broker/SimpleMessageDeque.h new file mode 100644 index 0000000000..5db0755a43 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessageDeque.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessageDeque.h + */ + +/* + * This is a copy of qpid::broker::MessageDeque.h, but using the local + * SimpleQueuedMessage class instead of QueuedMessage. + */ + +#ifndef qpid_broker_SimpleMessageDeque_h_ +#define qpid_broker_SimpleMessageDeque_h_ + +#include "SimpleMessages.h" + +#include "qpid/sys/Mutex.h" + +#include + +namespace qpid { +namespace broker { + +class SimpleMessageDeque : public SimpleMessages +{ +public: + SimpleMessageDeque(); + virtual ~SimpleMessageDeque(); + uint32_t size(); + bool push(boost::shared_ptr& added); + bool consume(boost::shared_ptr& msg); +private: + std::deque > m_messages; + qpid::sys::Mutex m_msgMutex; + +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleMessageDeque_h_ diff --git a/cpp/src/qpid/broker/SimpleMessages.h b/cpp/src/qpid/broker/SimpleMessages.h new file mode 100644 index 0000000000..2a40859032 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessages.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessages.h + */ + +/* + * This is a copy of qpid::broker::Messages.h, but using the local + * tests::storePerftools::asyncPerf::QueuedMessage class instead of + * qpid::broker::QueuedMessage. + */ + +#ifndef qpid_broker_SimpleMessages_h_ +#define qpid_broker_SimpleMessages_h_ + +#include +#include + +namespace qpid { +namespace broker { + +class SimpleQueuedMessage; + +class SimpleMessages +{ +public: + virtual ~SimpleMessages() {} + virtual uint32_t size() = 0; + virtual bool push(boost::shared_ptr& added) = 0; + virtual bool consume(boost::shared_ptr& msg) = 0; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleMessages_h_ diff --git a/cpp/src/qpid/broker/SimpleQueue.cpp b/cpp/src/qpid/broker/SimpleQueue.cpp new file mode 100644 index 0000000000..5cd8841f94 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleQueue.cpp @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueue.cpp + */ + +#include "SimpleQueue.h" + +#include "AsyncResultHandle.h" +#include "QueueAsyncContext.h" +#include "SimpleConsumer.h" +#include "SimpleDeliveryRecord.h" +#include "SimpleMessage.h" +#include "SimpleMessageDeque.h" +#include "SimpleQueuedMessage.h" +#include "SimpleTxnBuffer.h" + +#include // memcpy() + +namespace qpid { +namespace broker { + +//static +TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations + + +SimpleQueue::SimpleQueue(const std::string& name, + const qpid::framing::FieldTable& /*args*/, + AsyncStore* store, + AsyncResultQueue& arq) : + PersistableQueue(), + m_name(name), + m_store(store), + m_resultQueue(arq), + m_asyncOpCounter(0UL), + m_persistenceId(0ULL), + m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. + m_destroyPending(false), + m_destroyed(false), + m_barrier(*this), + m_messages(new SimpleMessageDeque()) +{ + if (m_store != 0) { + const qpid::types::Variant::Map qo; + m_queueHandle = m_store->createQueueHandle(m_name, qo); + } +} + +SimpleQueue::~SimpleQueue() {} + +const QueueHandle& +SimpleQueue::getHandle() const { + return m_queueHandle; +} + +QueueHandle& +SimpleQueue::getHandle() { + return m_queueHandle; +} + +AsyncStore* +SimpleQueue::getStore() { + return m_store; +} + +void +SimpleQueue::asyncCreate() { + if (m_store) { + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + &handleAsyncCreateResult, + &m_resultQueue)); + m_store->submitCreate(m_queueHandle, this, qac); + ++m_asyncOpCounter; + } +} + +//static +void +SimpleQueue::handleAsyncCreateResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr qc = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + boost::shared_ptr sq = boost::dynamic_pointer_cast(qc->getQueue()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + sq->createComplete(qc); + } + } +} + +void +SimpleQueue::asyncDestroy(const bool deleteQueue) +{ + m_destroyPending = true; + if (m_store) { + if (deleteQueue) { + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + &handleAsyncDestroyResult, + &m_resultQueue)); + m_store->submitDestroy(m_queueHandle, qac); + ++m_asyncOpCounter; + } + m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); + } +} + +//static +void +SimpleQueue::handleAsyncDestroyResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr qc = + boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + boost::shared_ptr sq = boost::dynamic_pointer_cast(qc->getQueue()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + sq->destroyComplete(qc); + } + } +} + +void +SimpleQueue::deliver(boost::intrusive_ptr msg) { + boost::shared_ptr qm(boost::shared_ptr(new SimpleQueuedMessage(this, msg))); + enqueue(qm); + push(qm); +} + +bool +SimpleQueue::dispatch(SimpleConsumer& sc) { + boost::shared_ptr qm; + if (m_messages->consume(qm)) { + boost::shared_ptr dr(new SimpleDeliveryRecord(qm, sc, false)); + sc.record(dr); + return true; + } + return false; +} + +bool +SimpleQueue::enqueue(boost::shared_ptr qm) { + return enqueue(0, qm); +} + +bool +SimpleQueue::enqueue(SimpleTxnBuffer* tb, + boost::shared_ptr qm) { + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm->payload()->isPersistent() && m_store) { + qm->payload()->enqueueAsync(shared_from_this(), m_store); + return asyncEnqueue(tb, qm); + } + return false; +} + +bool +SimpleQueue::dequeue(boost::shared_ptr qm) { + return dequeue(0, qm); +} + +bool +SimpleQueue::dequeue(SimpleTxnBuffer* tb, + boost::shared_ptr qm) { + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm->payload()->isPersistent() && m_store) { + qm->payload()->dequeueAsync(shared_from_this(), m_store); + return asyncDequeue(tb, qm); + } + return true; +} + +void +SimpleQueue::process(boost::intrusive_ptr msg) { + push(boost::shared_ptr(new SimpleQueuedMessage(this, msg))); +} + +void +SimpleQueue::enqueueAborted(boost::intrusive_ptr) {} + +void +SimpleQueue::encode(qpid::framing::Buffer& buffer) const { + buffer.putShortString(m_name); +} + +uint32_t +SimpleQueue::encodedSize() const { + return m_name.size() + 1; +} + +uint64_t +SimpleQueue::getPersistenceId() const { + return m_persistenceId; +} + +void +SimpleQueue::setPersistenceId(uint64_t persistenceId) const { + m_persistenceId = persistenceId; +} + +void +SimpleQueue::flush() { + //if(m_store) m_store->flush(*this); +} + +const std::string& +SimpleQueue::getName() const { + return m_name; +} + +void +SimpleQueue::setExternalQueueStore(ExternalQueueStore* inst) { + if (externalQueueStore != inst && externalQueueStore) + delete externalQueueStore; + externalQueueStore = inst; +} + +uint64_t +SimpleQueue::getSize() { + return m_persistableData.size(); +} + +void +SimpleQueue::write(char* target) { + ::memcpy(target, m_persistableData.data(), m_persistableData.size()); +} + +// --- Members & methods in msg handling path from qpid::Queue --- + +// protected +SimpleQueue::UsageBarrier::UsageBarrier(SimpleQueue& q) : + m_parent(q), + m_count(0) +{} + +// protected +bool +SimpleQueue::UsageBarrier::acquire() { + qpid::sys::Monitor::ScopedLock l(m_monitor); + if (m_parent.m_destroyed) { + return false; + } else { + ++m_count; + return true; + } +} + +// protected +void SimpleQueue::UsageBarrier::release() { + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + if (--m_count == 0) { + m_monitor.notifyAll(); + } +} + +// protected +void SimpleQueue::UsageBarrier::destroy() { + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + m_parent.m_destroyed = true; + while (m_count) { + m_monitor.wait(); + } +} + +// protected +SimpleQueue::ScopedUse::ScopedUse(UsageBarrier& b) : + m_barrier(b), + m_acquired(m_barrier.acquire()) +{} + +// protected +SimpleQueue::ScopedUse::~ScopedUse() { + if (m_acquired) { + m_barrier.release(); + } +} + +// private +void +SimpleQueue::push(boost::shared_ptr qm, + bool /*isRecovery*/) { + m_messages->push(qm); +} + +// --- End Members & methods in msg handling path from qpid::Queue --- + +// private +bool +SimpleQueue::asyncEnqueue(SimpleTxnBuffer* tb, + boost::shared_ptr qm) { + assert(qm.get()); + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + qm->payload(), + tb, + &handleAsyncEnqueueResult, + &m_resultQueue)); + if (tb) { + tb->incrOpCnt(); + m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac); + } else { + m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac); + } + ++m_asyncOpCounter; + return true; +} + +// private static +void +SimpleQueue::handleAsyncEnqueueResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr qc = + boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + boost::shared_ptr sq = boost::dynamic_pointer_cast(qc->getQueue()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + sq->enqueueComplete(qc); + } + } +} + +// private +bool +SimpleQueue::asyncDequeue(SimpleTxnBuffer* tb, + boost::shared_ptr qm) { + assert(qm.get()); + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + qm->payload(), + tb, + &handleAsyncDequeueResult, + &m_resultQueue)); + if (tb) { + tb->incrOpCnt(); + m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac); + } else { + m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac); + } + ++m_asyncOpCounter; + return true; +} + +// private static +void +SimpleQueue::handleAsyncDequeueResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr qc = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + boost::shared_ptr sq = boost::dynamic_pointer_cast(qc->getQueue()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + sq->dequeueComplete(qc); + } + } +} + +// private +void +SimpleQueue::destroyCheck(const std::string& opDescr) const { + if (m_destroyPending || m_destroyed) { + std::ostringstream oss; + oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; + throw qpid::Exception(oss.str()); + } +} + +// private +void +SimpleQueue::createComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } + --m_asyncOpCounter; +} + +// private +void +SimpleQueue::flushComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } + --m_asyncOpCounter; +} + +// private +void +SimpleQueue::destroyComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } + --m_asyncOpCounter; + m_destroyed = true; +} + +// private +void +SimpleQueue::enqueueComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + if (qc->getTxnBuffer()) { // transactional enqueue + qc->getTxnBuffer()->decrOpCnt(); + } + } + --m_asyncOpCounter; +} + +// private +void +SimpleQueue::dequeueComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + if (qc->getTxnBuffer()) { // transactional enqueue + qc->getTxnBuffer()->decrOpCnt(); + } + } + --m_asyncOpCounter; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleQueue.h b/cpp/src/qpid/broker/SimpleQueue.h new file mode 100644 index 0000000000..c2f21076cd --- /dev/null +++ b/cpp/src/qpid/broker/SimpleQueue.h @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueue.h + */ + +#ifndef qpid_broker_SimpleQueue_h_ +#define qpid_broker_SimpleQueue_h_ + +#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter +#include "qpid/broker/AsyncStore.h" // DataSource +#include "qpid/broker/PersistableQueue.h" +#include "qpid/broker/QueueHandle.h" +#include "qpid/sys/Monitor.h" + +#include +#include +#include + +namespace qpid { + +namespace framing { +class FieldTable; +} + +namespace broker { + +class AsyncResultQueue; +class QueueAsyncContext; +class SimpleConsumer; +class SimpleMessages; +class SimpleQueuedMessage; +class SimpleMessage; +class SimpleTxnBuffer; + +class SimpleQueue : public boost::enable_shared_from_this, + public PersistableQueue, + public DataSource +{ +public: + SimpleQueue(const std::string& name, + const qpid::framing::FieldTable& args, + AsyncStore* store, + AsyncResultQueue& arq); + virtual ~SimpleQueue(); + + const QueueHandle& getHandle() const; + QueueHandle& getHandle(); + AsyncStore* getStore(); + + void asyncCreate(); + static void handleAsyncCreateResult(const AsyncResultHandle* const arh); + void asyncDestroy(const bool deleteQueue); + static void handleAsyncDestroyResult(const AsyncResultHandle* const arh); + + // --- Methods in msg handling path from qpid::Queue --- + void deliver(boost::intrusive_ptr msg); + bool dispatch(SimpleConsumer& sc); + bool enqueue(boost::shared_ptr qm); + bool enqueue(SimpleTxnBuffer* tb, + boost::shared_ptr qm); + bool dequeue(boost::shared_ptr qm); + bool dequeue(SimpleTxnBuffer* tb, + boost::shared_ptr qm); + void process(boost::intrusive_ptr msg); + void enqueueAborted(boost::intrusive_ptr msg); + + // --- Interface qpid::broker::Persistable --- + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + virtual uint64_t getPersistenceId() const; + virtual void setPersistenceId(uint64_t persistenceId) const; + + // --- Interface qpid::broker::PersistableQueue --- + virtual void flush(); + virtual const std::string& getName() const; + virtual void setExternalQueueStore(ExternalQueueStore* inst); + + // --- Interface qpid::broker::DataStore --- + virtual uint64_t getSize(); + virtual void write(char* target); + +private: + static TxnHandle s_nullTxnHandle; // used for non-txn operations + + const std::string m_name; + AsyncStore* m_store; + AsyncResultQueue& m_resultQueue; + qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; // TODO: change this to non-async store counter! + mutable uint64_t m_persistenceId; + std::string m_persistableData; + QueueHandle m_queueHandle; + bool m_destroyPending; + bool m_destroyed; + + // --- Members & methods in msg handling path copied from qpid::Queue --- + struct UsageBarrier { + SimpleQueue& m_parent; + uint32_t m_count; + qpid::sys::Monitor m_monitor; + UsageBarrier(SimpleQueue& q); + bool acquire(); + void release(); + void destroy(); + }; + struct ScopedUse { + UsageBarrier& m_barrier; + const bool m_acquired; + ScopedUse(UsageBarrier& b); + ~ScopedUse(); + }; + UsageBarrier m_barrier; + std::auto_ptr m_messages; + void push(boost::shared_ptr qm, + bool isRecovery = false); + + // -- Async ops --- + bool asyncEnqueue(SimpleTxnBuffer* tb, + boost::shared_ptr qm); + static void handleAsyncEnqueueResult(const AsyncResultHandle* const arh); + bool asyncDequeue(SimpleTxnBuffer* tb, + boost::shared_ptr qm); + static void handleAsyncDequeueResult(const AsyncResultHandle* const arh); + + // --- Async op counter --- + void destroyCheck(const std::string& opDescr) const; + + // --- Async op completions (called through handleAsyncResult) --- + void createComplete(const boost::shared_ptr qc); + void flushComplete(const boost::shared_ptr qc); + void destroyComplete(const boost::shared_ptr qc); + void enqueueComplete(const boost::shared_ptr qc); + void dequeueComplete(const boost::shared_ptr qc); +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleQueue_h_ diff --git a/cpp/src/qpid/broker/SimpleQueuedMessage.cpp b/cpp/src/qpid/broker/SimpleQueuedMessage.cpp new file mode 100644 index 0000000000..35ac799ecc --- /dev/null +++ b/cpp/src/qpid/broker/SimpleQueuedMessage.cpp @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueuedMessage.cpp + */ + +#include "SimpleQueuedMessage.h" + +#include "SimpleMessage.h" +#include "SimpleQueue.h" + +namespace qpid { +namespace broker { + +SimpleQueuedMessage::SimpleQueuedMessage() : + m_queue(0) +{} + +SimpleQueuedMessage::SimpleQueuedMessage(SimpleQueue* q, + boost::intrusive_ptr msg) : + boost::enable_shared_from_this(), + m_queue(q), + m_msg(msg) +{ + if (m_queue->getStore()) { + m_enqHandle = q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()); + } +} + +SimpleQueuedMessage::SimpleQueuedMessage(const SimpleQueuedMessage& qm) : + boost::enable_shared_from_this(), + m_queue(qm.m_queue), + m_msg(qm.m_msg), + m_enqHandle(qm.m_enqHandle) +{} + +SimpleQueuedMessage::SimpleQueuedMessage(SimpleQueuedMessage* const qm) : + boost::enable_shared_from_this(), + m_queue(qm->m_queue), + m_msg(qm->m_msg), + m_enqHandle(qm->m_enqHandle) +{} + +SimpleQueuedMessage::~SimpleQueuedMessage() {} + +SimpleQueue* +SimpleQueuedMessage::getQueue() const { + return m_queue; +} + +boost::intrusive_ptr +SimpleQueuedMessage::payload() const { + return m_msg; +} + +const EnqueueHandle& +SimpleQueuedMessage::enqHandle() const { + return m_enqHandle; +} + +EnqueueHandle& +SimpleQueuedMessage::enqHandle() { + return m_enqHandle; +} + +void +SimpleQueuedMessage::prepareEnqueue(SimpleTxnBuffer* tb) { + m_queue->enqueue(tb, shared_from_this()); +} + +void +SimpleQueuedMessage::commitEnqueue() { + m_queue->process(m_msg); +} + +void +SimpleQueuedMessage::abortEnqueue() { + m_queue->enqueueAborted(m_msg); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleQueuedMessage.h b/cpp/src/qpid/broker/SimpleQueuedMessage.h new file mode 100644 index 0000000000..1172eb73f3 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleQueuedMessage.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueuedMessage.h + */ + +#ifndef qpid_broker_SimpleQueuedMessage_h_ +#define qpid_broker_SimpleQueuedMessage_h_ + +#include "AsyncStore.h" +#include "EnqueueHandle.h" + +#include +#include + +namespace qpid { +namespace broker { + +class SimpleMessage; +class SimpleQueue; + +class SimpleQueuedMessage : public boost::enable_shared_from_this +{ +public: + SimpleQueuedMessage(); + SimpleQueuedMessage(SimpleQueue* q, + boost::intrusive_ptr msg); + SimpleQueuedMessage(const SimpleQueuedMessage& qm); + SimpleQueuedMessage(SimpleQueuedMessage* const qm); + virtual ~SimpleQueuedMessage(); + SimpleQueue* getQueue() const; + boost::intrusive_ptr payload() const; + const EnqueueHandle& enqHandle() const; + EnqueueHandle& enqHandle(); + + // --- Transaction handling --- + void prepareEnqueue(qpid::broker::SimpleTxnBuffer* tb); + void commitEnqueue(); + void abortEnqueue(); + +private: + SimpleQueue* m_queue; + boost::intrusive_ptr m_msg; + qpid::broker::EnqueueHandle m_enqHandle; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleQueuedMessage_h_ diff --git a/cpp/src/qpid/broker/SimpleTxnAccept.cpp b/cpp/src/qpid/broker/SimpleTxnAccept.cpp new file mode 100644 index 0000000000..343bbb54c7 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnAccept.cpp @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnAccept.cpp + */ + +#include "SimpleTxnAccept.h" + +#include "SimpleDeliveryRecord.h" + +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { + +SimpleTxnAccept::SimpleTxnAccept(std::deque >& ops) : + m_ops(ops) +{} + +SimpleTxnAccept::~SimpleTxnAccept() {} + +// --- Interface TxnOp --- + +bool +SimpleTxnAccept::prepare(SimpleTxnBuffer* tb) throw() { + try { + for (std::deque >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + (*i)->dequeue(tb); + } + return true; + } catch (const std::exception& e) { + QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnAccept: Failed to prepare transaction: (unknown error)"); + } + return false; +} + +void +SimpleTxnAccept::commit() throw() { + try { + for (std::deque >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) { + (*i)->committed(); + (*i)->setEnded(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "TxnAccept: Failed to commit transaction: " << e.what()); + } catch(...) { + QPID_LOG(error, "TxnAccept: Failed to commit transaction: (unknown error)"); + } +} + +void +SimpleTxnAccept::rollback() throw() {} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleTxnAccept.h b/cpp/src/qpid/broker/SimpleTxnAccept.h new file mode 100644 index 0000000000..eb6963bc88 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnAccept.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnAccept.h + */ + +#ifndef tests_storePerftools_asyncPerf_SimpleTxnAccept_h_ +#define tests_storePerftools_asyncPerf_SimpleTxnAccept_h_ + +#include "SimpleTxnOp.h" + +#include "boost/shared_ptr.hpp" +#include + +namespace qpid { +namespace broker { + +class SimpleDeliveryRecord; + +class SimpleTxnAccept: public SimpleTxnOp { +public: + SimpleTxnAccept(std::deque >& ops); + virtual ~SimpleTxnAccept(); + + // --- Interface TxnOp --- + bool prepare(SimpleTxnBuffer* tb) throw(); + void commit() throw(); + void rollback() throw(); +private: + std::deque > m_ops; +}; + +}} // namespace qpid::broker + +#endif // tests_storePerftools_asyncPerf_SimpleTxnAccept_h_ diff --git a/cpp/src/qpid/broker/SimpleTxnBuffer.cpp b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp new file mode 100644 index 0000000000..d72a785c2a --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnBuffer.cpp + */ + +#include "SimpleTxnBuffer.h" + +#include "AsyncResultHandle.h" +#include "SimpleTxnOp.h" +#include "TxnAsyncContext.h" + +#include "qpid/log/Statement.h" + +#include + +namespace qpid { +namespace broker { + +qpid::sys::Mutex SimpleTxnBuffer::s_uuidMutex; + +SimpleTxnBuffer::SimpleTxnBuffer(AsyncResultQueue& arq) : + m_store(0), + m_resultQueue(arq), + m_tpcFlag(false), + m_submitOpCnt(0), + m_completeOpCnt(0), + m_state(NONE) +{ + createLocalXid(); +} + +SimpleTxnBuffer::SimpleTxnBuffer(AsyncResultQueue& arq, std::string& xid) : + m_store(0), + m_resultQueue(arq), + m_xid(xid), + m_tpcFlag(!xid.empty()), + m_submitOpCnt(0), + m_completeOpCnt(0), + m_state(NONE) +{ + if (m_xid.empty()) { + createLocalXid(); + } +} + +SimpleTxnBuffer::~SimpleTxnBuffer() {} + +TxnHandle& +SimpleTxnBuffer::getTxnHandle() { + return m_txnHandle; +} + +const std::string& +SimpleTxnBuffer::getXid() const { + return m_xid; +} + +bool +SimpleTxnBuffer::is2pc() const { + return m_tpcFlag; +} + +void +SimpleTxnBuffer::incrOpCnt() { + qpid::sys::ScopedLock l(m_submitOpCntMutex); + ++m_submitOpCnt; +} + +void +SimpleTxnBuffer::decrOpCnt() { + const uint32_t numOps = getNumOps(); + qpid::sys::ScopedLock l2(m_completeOpCntMutex); + qpid::sys::ScopedLock l3(m_submitOpCntMutex); + if (m_completeOpCnt == m_submitOpCnt) { + throw qpid::Exception("Transaction async operation count underflow"); + } + ++m_completeOpCnt; + if (numOps == m_submitOpCnt && numOps == m_completeOpCnt) { + asyncLocalCommit(); + } +} + +void +SimpleTxnBuffer::enlist(boost::shared_ptr op) { + qpid::sys::ScopedLock l(m_opsMutex); + m_ops.push_back(op); +} + +bool +SimpleTxnBuffer::prepare() { + qpid::sys::ScopedLock l(m_opsMutex); + for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + if (!(*i)->prepare(this)) { + return false; + } + } + return true; +} + +void +SimpleTxnBuffer::commit() { + qpid::sys::ScopedLock l(m_opsMutex); + for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + (*i)->commit(); + } + m_ops.clear(); +} + +void +SimpleTxnBuffer::rollback() { + qpid::sys::ScopedLock l(m_opsMutex); + for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + (*i)->rollback(); + } + m_ops.clear(); +} + +bool +SimpleTxnBuffer::commitLocal(AsyncTransactionalStore* const store) { + try { + m_store = store; + asyncLocalCommit(); + } catch (std::exception& e) { + QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)"); + } + return false; +} + +void +SimpleTxnBuffer::asyncLocalCommit() { + switch(m_state) { + case NONE: + m_state = PREPARE; + if (m_store) { + m_txnHandle = m_store->createTxnHandle(this); + } + prepare(/*shared_from_this()*/); + if (m_store) { + break; + } + case PREPARE: + m_state = COMMIT; + if (m_store) { + boost::shared_ptr tac(new TxnAsyncContext(this, + &handleAsyncCommitResult, + &m_resultQueue)); + m_store->testOp(); + m_store->submitCommit(m_txnHandle, tac); + break; + } + case COMMIT: + commit(); + m_state = COMPLETE; + delete this; + break; + case COMPLETE: + default: ; + } +} + +//static +void +SimpleTxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr tac = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + QPID_LOG(error, "TxnBuffer::handleAsyncCommitResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() + << " (" << arh->getErrMsg() << ")"); + tac->getTxnBuffer()->asyncLocalAbort(); + } else { + tac->getTxnBuffer()->asyncLocalCommit(); + } + } +} + +void +SimpleTxnBuffer::asyncLocalAbort() { + assert(m_store != 0); + switch (m_state) { + case NONE: + case PREPARE: + case COMMIT: + m_state = ROLLBACK; + { + boost::shared_ptr tac(new TxnAsyncContext(this, + &handleAsyncAbortResult, + &m_resultQueue)); + m_store->submitCommit(m_txnHandle, tac); + } + break; + case ROLLBACK: + rollback(); + m_state = COMPLETE; + delete this; + default: ; + } +} + +//static +void +SimpleTxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr tac = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() + << " failed: err=" << arh->getErrNo() << " (" << arh->getErrMsg() << ")"); + } + tac->getTxnBuffer()->asyncLocalAbort(); + } +} + +// private +uint32_t +SimpleTxnBuffer::getNumOps() const { + qpid::sys::ScopedLock l(m_opsMutex); + return m_ops.size(); +} + +// private +void +SimpleTxnBuffer::createLocalXid() +{ + uuid_t uuid; + { + qpid::sys::ScopedLock l(s_uuidMutex); + ::uuid_generate_random(uuid); // Not thread-safe + } + char uuidStr[37]; // 36-char uuid + trailing '\0' + ::uuid_unparse(uuid, uuidStr); + m_xid.assign(uuidStr); + QPID_LOG(debug, "Local XID created: \"" << m_xid << "\""); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleTxnBuffer.h b/cpp/src/qpid/broker/SimpleTxnBuffer.h new file mode 100644 index 0000000000..b2164cfeed --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnBuffer.h @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnBuffer.h + */ + +#ifndef qpid_broker_SimpleTxnBuffer_h_ +#define qpid_broker_SimpleTxnBuffer_h_ + +#include "TxnHandle.h" + +#include "qpid/sys/Mutex.h" + +#include +#include + +namespace qpid { +namespace broker { + +class AsyncResultHandle; +class AsyncResultQueue; +class AsyncTransactionalStore; +class SimpleTxnOp; + +class SimpleTxnBuffer { +public: + SimpleTxnBuffer(AsyncResultQueue& arq); + SimpleTxnBuffer(AsyncResultQueue& arq, std::string& xid); + virtual ~SimpleTxnBuffer(); + TxnHandle& getTxnHandle(); + const std::string& getXid() const; + bool is2pc() const; + void incrOpCnt(); + void decrOpCnt(); + + void enlist(boost::shared_ptr op); + bool prepare(); + void commit(); + void rollback(); + bool commitLocal(AsyncTransactionalStore* const store); + + // --- Async operations --- + void asyncLocalCommit(); + static void handleAsyncCommitResult(const AsyncResultHandle* const arh); + void asyncLocalAbort(); + static void handleAsyncAbortResult(const AsyncResultHandle* const arh); + +private: + mutable qpid::sys::Mutex m_opsMutex; + mutable qpid::sys::Mutex m_submitOpCntMutex; + mutable qpid::sys::Mutex m_completeOpCntMutex; + static qpid::sys::Mutex s_uuidMutex; + + std::vector > m_ops; + TxnHandle m_txnHandle; + AsyncTransactionalStore* m_store; + AsyncResultQueue& m_resultQueue; + std::string m_xid; + bool m_tpcFlag; + uint32_t m_submitOpCnt; + uint32_t m_completeOpCnt; + + typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState; + e_txnState m_state; + + uint32_t getNumOps() const; + void createLocalXid(); +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleTxnBuffer_h_ diff --git a/cpp/src/qpid/broker/SimpleTxnOp.h b/cpp/src/qpid/broker/SimpleTxnOp.h new file mode 100644 index 0000000000..2cec2da8f0 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnOp.h @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnOp.h + */ + +#ifndef qpid_broker_SimpleTxnOp_h_ +#define qpid_broker_SimpleTxnOp_h_ + +#include + +namespace qpid { +namespace broker { + +class SimpleTxnBuffer; + +class SimpleTxnOp{ +public: + virtual ~SimpleTxnOp() {} + virtual bool prepare(SimpleTxnBuffer*) throw() = 0; + virtual void commit() throw() = 0; + virtual void rollback() throw() = 0; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleTxnOp_h_ diff --git a/cpp/src/qpid/broker/SimpleTxnPublish.cpp b/cpp/src/qpid/broker/SimpleTxnPublish.cpp new file mode 100644 index 0000000000..6ad6a108ea --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnPublish.cpp @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnPublish.cpp + */ + +#include "SimpleTxnPublish.h" + +#include "SimpleMessage.h" +#include "SimpleQueue.h" +#include "SimpleQueuedMessage.h" + +#include "qpid/log/Statement.h" +#include + +namespace qpid { +namespace broker { + +SimpleTxnPublish::SimpleTxnPublish(boost::intrusive_ptr msg) : + m_msg(msg) +{} + +SimpleTxnPublish::~SimpleTxnPublish() {} + +bool +SimpleTxnPublish::prepare(SimpleTxnBuffer* tb) throw() { + try { + while (!m_queues.empty()) { + m_queues.front()->prepareEnqueue(tb); + m_prepared.push_back(m_queues.front()); + m_queues.pop_front(); + } + return true; + } catch (const std::exception& e) { + QPID_LOG(error, "TxnPublish: Failed to prepare transaction: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnPublish: Failed to prepare transaction: (unknown error)"); + } + return false; +} + +void +SimpleTxnPublish::commit() throw() { + try { + for (std::list >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { + (*i)->commitEnqueue(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "TxnPublish: Failed to commit transaction: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnPublish: Failed to commit transaction: (unknown error)"); + } +} + +void +SimpleTxnPublish::rollback() throw() { + try { + for (std::list >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { + (*i)->abortEnqueue(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "TxnPublish: Failed to rollback transaction: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnPublish: Failed to rollback transaction: (unknown error)"); + } +} + +uint64_t +SimpleTxnPublish::contentSize() { + return m_msg->contentSize(); +} + +void +SimpleTxnPublish::deliverTo(const boost::shared_ptr& queue) { + m_queues.push_back(boost::shared_ptr(new SimpleQueuedMessage(queue.get(), m_msg))); + m_delivered = true; +} + +SimpleMessage& +SimpleTxnPublish::getMessage() { + return *m_msg; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleTxnPublish.h b/cpp/src/qpid/broker/SimpleTxnPublish.h new file mode 100644 index 0000000000..0aaf8e4ba0 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnPublish.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnPublish.h + */ + +#ifndef qpid_broker_SimpleTxnPublish_h_ +#define qpid_broker_SimpleTxnPublish_h_ + +#include "SimpleDeliverable.h" +#include "SimpleTxnOp.h" + +#include +#include +#include + + +namespace qpid { +namespace broker { + +class SimpleQueuedMessage; +class SimpleMessage; +class SimpleQueue; + +class SimpleTxnPublish : public SimpleTxnOp, + public SimpleDeliverable +{ +public: + SimpleTxnPublish(boost::intrusive_ptr msg); + virtual ~SimpleTxnPublish(); + + // --- Interface TxOp --- + bool prepare(SimpleTxnBuffer* tb) throw(); + void commit() throw(); + void rollback() throw(); + + // --- Interface Deliverable --- + uint64_t contentSize(); + void deliverTo(const boost::shared_ptr& queue); + SimpleMessage& getMessage(); + +private: + boost::intrusive_ptr m_msg; + std::list > m_queues; + std::list > m_prepared; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleTxnPublish_h_ diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp index 63e2de2b41..527cb4741f 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.cpp +++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp @@ -26,7 +26,7 @@ namespace qpid { namespace broker { -TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, +TxnAsyncContext::TxnAsyncContext(SimpleTxnBuffer* const tb, AsyncResultCallback rcb, AsyncResultQueue* const arq): m_tb(tb), @@ -37,7 +37,7 @@ TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, TxnAsyncContext::~TxnAsyncContext() {} -TxnBuffer* +SimpleTxnBuffer* TxnAsyncContext::getTxnBuffer() const { return m_tb; diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h index 9c617238e8..04f6ef76f5 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.h +++ b/cpp/src/qpid/broker/TxnAsyncContext.h @@ -29,38 +29,34 @@ #include "qpid/asyncStore/AsyncOperation.h" namespace qpid { -//namespace asyncStore { -//class AsyncOperation; -//} namespace broker { class AsyncResultHandle; class AsyncResultQueue; -//class TxnHandle; typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); class TxnAsyncContext: public BrokerAsyncContext { public: - TxnAsyncContext(TxnBuffer* const tb, + TxnAsyncContext(SimpleTxnBuffer* const tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); virtual ~TxnAsyncContext(); - TxnBuffer* getTxnBuffer() const; + SimpleTxnBuffer* getTxnBuffer() const; // --- Interface BrokerAsyncContext --- AsyncResultQueue* getAsyncResultQueue() const; void invokeCallback(const AsyncResultHandle* const) const; private: - TxnBuffer* const m_tb; + SimpleTxnBuffer* const m_tb; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; }; class TpcTxnAsyncContext : public TxnAsyncContext { public: - TpcTxnAsyncContext(TxnBuffer* const tb, + TpcTxnAsyncContext(SimpleTxnBuffer* const tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : TxnAsyncContext(tb, rcb, arq) diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp deleted file mode 100644 index 4d6e7b7918..0000000000 --- a/cpp/src/qpid/broker/TxnBuffer.cpp +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnBuffer.cpp - */ - -#include "TxnBuffer.h" - -#include "AsyncResultHandle.h" -#include "TxnAsyncContext.h" -#include "TxnOp.h" - -#include "qpid/log/Statement.h" - -#include - -namespace qpid { -namespace broker { - -qpid::sys::Mutex TxnBuffer::s_uuidMutex; - -TxnBuffer::TxnBuffer(AsyncResultQueue& arq) : - m_store(0), - m_resultQueue(arq), - m_tpcFlag(false), - m_submitOpCnt(0), - m_completeOpCnt(0), - m_state(NONE) -{ - createLocalXid(); -} - -TxnBuffer::TxnBuffer(AsyncResultQueue& arq, std::string& xid) : - m_store(0), - m_resultQueue(arq), - m_xid(xid), - m_tpcFlag(!xid.empty()), - m_submitOpCnt(0), - m_completeOpCnt(0), - m_state(NONE) -{ - if (m_xid.empty()) { - createLocalXid(); - } -} - -TxnBuffer::~TxnBuffer() {} - -TxnHandle& -TxnBuffer::getTxnHandle() { - return m_txnHandle; -} - -const std::string& -TxnBuffer::getXid() const { - return m_xid; -} - -bool -TxnBuffer::is2pc() const { - return m_tpcFlag; -} - -void -TxnBuffer::incrOpCnt() { - qpid::sys::ScopedLock l(m_submitOpCntMutex); - ++m_submitOpCnt; -} - -void -TxnBuffer::decrOpCnt() { - const uint32_t numOps = getNumOps(); - qpid::sys::ScopedLock l2(m_completeOpCntMutex); - qpid::sys::ScopedLock l3(m_submitOpCntMutex); - if (m_completeOpCnt == m_submitOpCnt) { - throw qpid::Exception("Transaction async operation count underflow"); - } - ++m_completeOpCnt; - if (numOps == m_submitOpCnt && numOps == m_completeOpCnt) { - asyncLocalCommit(); - } -} - -void -TxnBuffer::enlist(boost::shared_ptr op) { - qpid::sys::ScopedLock l(m_opsMutex); - m_ops.push_back(op); -} - -bool -TxnBuffer::prepare() { - qpid::sys::ScopedLock l(m_opsMutex); - for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { - if (!(*i)->prepare(this)) { - return false; - } - } - return true; -} - -void -TxnBuffer::commit() { - qpid::sys::ScopedLock l(m_opsMutex); - for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { - (*i)->commit(); - } - m_ops.clear(); -} - -void -TxnBuffer::rollback() { - qpid::sys::ScopedLock l(m_opsMutex); - for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { - (*i)->rollback(); - } - m_ops.clear(); -} - -bool -TxnBuffer::commitLocal(AsyncTransactionalStore* const store) { - try { - m_store = store; - asyncLocalCommit(); - } catch (std::exception& e) { - QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what()); - } catch (...) { - QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)"); - } - return false; -} - -void -TxnBuffer::asyncLocalCommit() { - switch(m_state) { - case NONE: - m_state = PREPARE; - if (m_store) { - m_txnHandle = m_store->createTxnHandle(this); - } - prepare(/*shared_from_this()*/); - if (m_store) { - break; - } - case PREPARE: - m_state = COMMIT; - if (m_store) { - boost::shared_ptr tac(new TxnAsyncContext(this, - &handleAsyncCommitResult, - &m_resultQueue)); - m_store->testOp(); - m_store->submitCommit(m_txnHandle, tac); - break; - } - case COMMIT: - commit(); - m_state = COMPLETE; - delete this; - break; - case COMPLETE: - default: ; - } -} - -//static -void -TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { - if (arh) { - boost::shared_ptr tac = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); - if (arh->getErrNo()) { - QPID_LOG(error, "TxnBuffer::handleAsyncCommitResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() - << " (" << arh->getErrMsg() << ")"); - tac->getTxnBuffer()->asyncLocalAbort(); - } else { - tac->getTxnBuffer()->asyncLocalCommit(); - } - } -} - -void -TxnBuffer::asyncLocalAbort() { - assert(m_store != 0); - switch (m_state) { - case NONE: - case PREPARE: - case COMMIT: - m_state = ROLLBACK; - { - boost::shared_ptr tac(new TxnAsyncContext(this, - &handleAsyncAbortResult, - &m_resultQueue)); - m_store->submitCommit(m_txnHandle, tac); - } - break; - case ROLLBACK: - rollback(); - m_state = COMPLETE; - delete this; - default: ; - } -} - -//static -void -TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { - if (arh) { - boost::shared_ptr tac = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); - if (arh->getErrNo()) { - QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() - << " failed: err=" << arh->getErrNo() << " (" << arh->getErrMsg() << ")"); - } - tac->getTxnBuffer()->asyncLocalAbort(); - } -} - -// private -uint32_t -TxnBuffer::getNumOps() const { - qpid::sys::ScopedLock l(m_opsMutex); - return m_ops.size(); -} - -// private -void -TxnBuffer::createLocalXid() -{ - uuid_t uuid; - { - qpid::sys::ScopedLock l(s_uuidMutex); - ::uuid_generate_random(uuid); // Not thread-safe - } - char uuidStr[37]; // 36-char uuid + trailing '\0' - ::uuid_unparse(uuid, uuidStr); - m_xid.assign(uuidStr); - QPID_LOG(debug, "Local XID created: \"" << m_xid << "\""); -} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h deleted file mode 100644 index 02569f6545..0000000000 --- a/cpp/src/qpid/broker/TxnBuffer.h +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnBuffer.h - */ - -#ifndef qpid_broker_TxnBuffer_h_ -#define qpid_broker_TxnBuffer_h_ - -#include "TxnHandle.h" - -#include "qpid/sys/Mutex.h" - -#include -#include - -namespace qpid { -namespace broker { - -class AsyncResultHandle; -class AsyncResultQueue; -class AsyncTransactionalStore; -class TxnOp; - -class TxnBuffer { -public: - TxnBuffer(AsyncResultQueue& arq); - TxnBuffer(AsyncResultQueue& arq, std::string& xid); - virtual ~TxnBuffer(); - TxnHandle& getTxnHandle(); - const std::string& getXid() const; - bool is2pc() const; - void incrOpCnt(); - void decrOpCnt(); - - void enlist(boost::shared_ptr op); - bool prepare(); - void commit(); - void rollback(); - bool commitLocal(AsyncTransactionalStore* const store); - - // --- Async operations --- - void asyncLocalCommit(); - static void handleAsyncCommitResult(const AsyncResultHandle* const arh); - void asyncLocalAbort(); - static void handleAsyncAbortResult(const AsyncResultHandle* const arh); - -private: - mutable qpid::sys::Mutex m_opsMutex; - mutable qpid::sys::Mutex m_submitOpCntMutex; - mutable qpid::sys::Mutex m_completeOpCntMutex; - static qpid::sys::Mutex s_uuidMutex; - - std::vector > m_ops; - TxnHandle m_txnHandle; - AsyncTransactionalStore* m_store; - AsyncResultQueue& m_resultQueue; - std::string m_xid; - bool m_tpcFlag; - uint32_t m_submitOpCnt; - uint32_t m_completeOpCnt; - - typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState; - e_txnState m_state; - - uint32_t getNumOps() const; - void createLocalXid(); -}; - -}} // namespace qpid::broker - -#endif // qpid_broker_TxnBuffer_h_ diff --git a/cpp/src/qpid/broker/TxnOp.h b/cpp/src/qpid/broker/TxnOp.h deleted file mode 100644 index bcff87551c..0000000000 --- a/cpp/src/qpid/broker/TxnOp.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnOp.h - */ - -#ifndef qpid_broker_TxnOp_h_ -#define qpid_broker_TxnOp_h_ - -#include - -namespace qpid { -namespace broker { - -class TxnBuffer; - -class TxnOp{ -public: - virtual ~TxnOp() {} - virtual bool prepare(qpid::broker::TxnBuffer*) throw() = 0; - virtual void commit() throw() = 0; - virtual void rollback() throw() = 0; -}; - -}} // namespace qpid::broker - -#endif // qpid_broker_TxnOp_h_ diff --git a/cpp/src/tests/asyncstore.cmake b/cpp/src/tests/asyncstore.cmake index 94631bb8ea..3dcd81c3d7 100644 --- a/cpp/src/tests/asyncstore.cmake +++ b/cpp/src/tests/asyncstore.cmake @@ -51,20 +51,20 @@ endif (UNIX) # Async store perf test (asyncPerf) set (asyncStorePerf_SOURCES - storePerftools/asyncPerf/Deliverable.cpp - storePerftools/asyncPerf/DeliveryRecord.cpp - storePerftools/asyncPerf/MessageAsyncContext.cpp storePerftools/asyncPerf/MessageConsumer.cpp - storePerftools/asyncPerf/MessageDeque.cpp storePerftools/asyncPerf/MessageProducer.cpp storePerftools/asyncPerf/PerfTest.cpp - storePerftools/asyncPerf/QueuedMessage.cpp - storePerftools/asyncPerf/SimpleMessage.cpp - storePerftools/asyncPerf/SimpleQueue.cpp +# storePerftools/asyncPerf/SimpleDeliverable.cpp +# storePerftools/asyncPerf/SimpleDeliveryRecord.cpp +# storePerftools/asyncPerf/SimpleMessage.cpp +# storePerftools/asyncPerf/SimpleMessageAsyncContext.cpp +# storePerftools/asyncPerf/SimpleMessageDeque.cpp +# storePerftools/asyncPerf/SimpleQueue.cpp +# storePerftools/asyncPerf/SimpleQueuedMessage.cpp +# storePerftools/asyncPerf/SimpleTxnAccept.cpp +# storePerftools/asyncPerf/SimpleTxnPublish.cpp storePerftools/asyncPerf/TestOptions.cpp storePerftools/asyncPerf/TestResult.cpp - storePerftools/asyncPerf/TxnAccept.cpp - storePerftools/asyncPerf/TxnPublish.cpp storePerftools/common/Parameters.cpp storePerftools/common/PerftoolError.cpp diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp deleted file mode 100644 index 9da7e348e0..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Deliverable.cpp - */ - -#include "Deliverable.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -Deliverable::Deliverable() : - m_delivered(false) -{} - -Deliverable::~Deliverable() -{} - -bool -Deliverable::isDelivered() const -{ - return m_delivered; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h deleted file mode 100644 index 990d53a199..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Deliverable.h - */ - -#ifndef tests_storePerftools_asyncPerf_Deliverable_h_ -#define tests_storePerftools_asyncPerf_Deliverable_h_ - -#include -#include // uint64_t - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class SimpleMessage; -class SimpleQueue; - -class Deliverable -{ -public: - Deliverable(); - virtual ~Deliverable(); - - virtual uint64_t contentSize() = 0; - virtual void deliverTo(const boost::shared_ptr& queue) = 0; - virtual SimpleMessage& getMessage() = 0; - virtual bool isDelivered() const; - -protected: - bool m_delivered; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_Deliverable_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp deleted file mode 100644 index 6f33369a26..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file DeliveryRecord.cpp - */ - -#include "DeliveryRecord.h" - -#include "MessageConsumer.h" -#include "QueuedMessage.h" -#include "SimpleMessage.h" -#include "SimpleQueue.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -DeliveryRecord::DeliveryRecord(boost::shared_ptr qm, - MessageConsumer& mc, - bool accepted) : - m_queuedMessage(qm), - m_msgConsumer(mc), - m_accepted(accepted), - m_ended(accepted) -{} - -DeliveryRecord::~DeliveryRecord() -{} - -bool -DeliveryRecord::accept() -{ - if (!m_ended) { - m_queuedMessage->getQueue()->dequeue(m_queuedMessage); - m_accepted = true; - setEnded(); - } - return isRedundant(); -} - -bool -DeliveryRecord::isAccepted() const -{ - return m_accepted; -} - -bool -DeliveryRecord::setEnded() -{ - m_ended = true; - m_queuedMessage->payload() = boost::intrusive_ptr(0); - return isRedundant(); -} - -bool -DeliveryRecord::isEnded() const -{ - return m_ended; -} - -bool -DeliveryRecord::isRedundant() const -{ - return m_ended; -} - -void -DeliveryRecord::dequeue(qpid::broker::TxnBuffer* tb) -{ - m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage); -} - -void -DeliveryRecord::committed() const -{ - m_msgConsumer.commitComplete(); -} - -boost::shared_ptr -DeliveryRecord::getQueuedMessage() const -{ - return m_queuedMessage; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h deleted file mode 100644 index 6c5d87f374..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file DeliveryRecord.h - */ - -#ifndef tests_storePerftools_asyncPerf_DeliveryRecord_h_ -#define tests_storePerftools_asyncPerf_DeliveryRecord_h_ - -//#include "QueuedMessage.h" - -#include - -namespace qpid { -namespace broker { -class TxnBuffer; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class MessageConsumer; -class QueuedMessage; - -class DeliveryRecord { -public: - DeliveryRecord(boost::shared_ptr qm, - MessageConsumer& mc, - bool accepted); - virtual ~DeliveryRecord(); - bool accept(); - bool isAccepted() const; - bool setEnded(); - bool isEnded() const; - bool isRedundant() const; - void dequeue(qpid::broker::TxnBuffer* tb); - void committed() const; - boost::shared_ptr getQueuedMessage() const; -private: - boost::shared_ptr m_queuedMessage; - MessageConsumer& m_msgConsumer; - bool m_accepted : 1; - bool m_ended : 1; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_DeliveryRecord_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp deleted file mode 100644 index e3bfe9ae7a..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MessageContext.cpp - */ - -#include "MessageAsyncContext.h" - -#include "SimpleMessage.h" - -#include - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr msg, - boost::shared_ptr q) : - m_msg(msg), - m_q(q) -{ - assert(m_msg.get() != 0); - assert(m_q.get() != 0); -} - -MessageAsyncContext::~MessageAsyncContext() -{} - -boost::intrusive_ptr -MessageAsyncContext::getMessage() const -{ - return m_msg; -} - -boost::shared_ptr -MessageAsyncContext::getQueue() const -{ - return m_q; -} - -void -MessageAsyncContext::destroy() -{ - delete this; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h deleted file mode 100644 index 9252fbda45..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MessageContext.h - */ - -#ifndef tests_storePerfTools_asyncPerf_MessageContext_h_ -#define tests_storePerfTools_asyncPerf_MessageContext_h_ - -#include "qpid/asyncStore/AsyncOperation.h" - -#include -#include - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class SimpleMessage; -class SimpleQueue; - -class MessageAsyncContext : public qpid::broker::BrokerAsyncContext -{ -public: - MessageAsyncContext(boost::intrusive_ptr msg, - boost::shared_ptr q); - virtual ~MessageAsyncContext(); - boost::intrusive_ptr getMessage() const; - boost::shared_ptr getQueue() const; - void destroy(); - -private: - boost::intrusive_ptr m_msg; - boost::shared_ptr m_q; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerfTools_asyncPerf_MessageContext_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 6aa477c470..6477696bd6 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -23,13 +23,12 @@ #include "MessageConsumer.h" -#include "DeliveryRecord.h" -#include "SimpleQueue.h" #include "TestOptions.h" -#include "TxnAccept.h" -#include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/TxnBuffer.h" +#include "qpid/broker/SimpleDeliveryRecord.h" +#include "qpid/broker/SimpleQueue.h" +#include "qpid/broker/SimpleTxnAccept.h" +#include "qpid/broker/SimpleTxnBuffer.h" #include // uint32_t @@ -38,9 +37,9 @@ namespace storePerftools { namespace asyncPerf { MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr queue) : + boost::shared_ptr queue) : m_perfTestParams(perfTestParams), m_store(store), m_resultQueue(arq), @@ -50,7 +49,7 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, MessageConsumer::~MessageConsumer() {} void -MessageConsumer::record(boost::shared_ptr dr) { +MessageConsumer::record(boost::shared_ptr dr) { m_unacked.push_back(dr); } @@ -61,9 +60,9 @@ void* MessageConsumer::runConsumers() { const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U; uint16_t opsInTxnCnt = 0U; - qpid::broker::TxnBuffer* tb = 0; + qpid::broker::SimpleTxnBuffer* tb = 0; if (useTxns) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } uint32_t msgsPerConsumer = m_perfTestParams.m_numEnqThreadsPerQueue * m_perfTestParams.m_numMsgs / @@ -74,19 +73,19 @@ MessageConsumer::runConsumers() { ++numMsgs; if (useTxns) { // --- Transactional dequeue --- - boost::shared_ptr ta(new TxnAccept(m_unacked)); + boost::shared_ptr ta(new qpid::broker::SimpleTxnAccept(m_unacked)); m_unacked.clear(); tb->enlist(ta); if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) { tb->commitLocal(m_store); if (numMsgs < m_perfTestParams.m_numMsgs) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } opsInTxnCnt = 0U; } } else { // --- Non-transactional dequeue --- - for (std::deque >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) { + for (std::deque >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) { (*i)->accept(); } m_unacked.clear(); diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h index b110520889..d5a881f7e0 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h @@ -24,44 +24,44 @@ #ifndef tests_storePerftools_asyncPerf_MessageConsumer_h_ #define tests_storePerftools_asyncPerf_MessageConsumer_h_ +#include "qpid/broker/SimpleConsumer.h" + #include "boost/shared_ptr.hpp" #include namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -} namespace broker { class AsyncResultQueue; +class AsyncStore; +class SimpleDeliveryRecord; +class SimpleQueue; }} namespace tests { namespace storePerftools { namespace asyncPerf { -class DeliveryRecord; -class SimpleQueue; class TestOptions; -class MessageConsumer +class MessageConsumer: public qpid::broker::SimpleConsumer { public: MessageConsumer(const TestOptions& perfTestParams, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr queue); + boost::shared_ptr queue); virtual ~MessageConsumer(); - void record(boost::shared_ptr dr); + void record(boost::shared_ptr dr); void commitComplete(); void* runConsumers(); static void* startConsumers(void* ptr); private: const TestOptions& m_perfTestParams; - qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncStore* m_store; qpid::broker::AsyncResultQueue& m_resultQueue; - boost::shared_ptr m_queue; - std::deque > m_unacked; + boost::shared_ptr m_queue; + std::deque > m_unacked; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp deleted file mode 100644 index 1fa2c087ac..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MessageDeque.cpp - */ - -#include "MessageDeque.h" - -#include "QueuedMessage.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -MessageDeque::MessageDeque() -{} - -MessageDeque::~MessageDeque() -{} - -uint32_t -MessageDeque::size() -{ - qpid::sys::ScopedLock l(m_msgMutex); - return m_messages.size(); -} - -bool -MessageDeque::push(boost::shared_ptr& added) -{ - qpid::sys::ScopedLock l(m_msgMutex); - m_messages.push_back(added); - return false; -} - -bool -MessageDeque::consume(boost::shared_ptr& msg) -{ - qpid::sys::ScopedLock l(m_msgMutex); - if (!m_messages.empty()) { - msg = m_messages.front(); - m_messages.pop_front(); - return true; - } - return false; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h deleted file mode 100644 index 021015f3e0..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file MessageDeque.h - */ - -/* - * This is a copy of qpid::broker::MessageDeque.h, but using the local - * tests::storePerftools::asyncPerf::QueuedMessage class instead of - * qpid::broker::QueuedMessage. - */ - -#ifndef tests_storePerftools_asyncPerf_MessageDeque_h_ -#define tests_storePerftools_asyncPerf_MessageDeque_h_ - -#include "Messages.h" - -#include "qpid/sys/Mutex.h" - -#include - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class MessageDeque : public Messages -{ -public: - MessageDeque(); - virtual ~MessageDeque(); - uint32_t size(); - bool push(boost::shared_ptr& added); - bool consume(boost::shared_ptr& msg); -private: - std::deque > m_messages; - qpid::sys::Mutex m_msgMutex; - -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_MessageDeque_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index 974f3f3981..f88d305a38 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -23,13 +23,12 @@ #include "MessageProducer.h" -#include "SimpleMessage.h" -#include "SimpleQueue.h" #include "TestOptions.h" -#include "TxnPublish.h" -#include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/TxnBuffer.h" +#include "qpid/broker/SimpleMessage.h" +#include "qpid/broker/SimpleQueue.h" +#include "qpid/broker/SimpleTxnBuffer.h" +#include "qpid/broker/SimpleTxnPublish.h" #include // uint32_t @@ -39,9 +38,9 @@ namespace asyncPerf { MessageProducer::MessageProducer(const TestOptions& perfTestParams, const char* msgData, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr queue) : + boost::shared_ptr queue) : m_perfTestParams(perfTestParams), m_msgData(msgData), m_store(store), @@ -55,14 +54,14 @@ void* MessageProducer::runProducers() { const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U; uint16_t recsInTxnCnt = 0U; - qpid::broker::TxnBuffer* tb = 0; + qpid::broker::SimpleTxnBuffer* tb = 0; if (useTxns) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } for (uint32_t numMsgs=0; numMsgs msg(new SimpleMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); + boost::intrusive_ptr msg(new qpid::broker::SimpleMessage(m_msgData, m_perfTestParams.m_msgSize, m_store)); if (useTxns) { - boost::shared_ptr op(new TxnPublish(msg)); + boost::shared_ptr op(new qpid::broker::SimpleTxnPublish(msg)); op->deliverTo(m_queue); tb->enlist(op); if (++recsInTxnCnt >= m_perfTestParams.m_enqTxnBlockSize) { @@ -72,7 +71,7 @@ MessageProducer::runProducers() { // transaction until the current commit cycle completes. So use another instance. This // instance should auto-delete when the async commit cycle completes. if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) { - tb = new qpid::broker::TxnBuffer(m_resultQueue); + tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue); } recsInTxnCnt = 0U; } diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h index 127408e3db..6f98d03503 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h @@ -27,19 +27,17 @@ #include "boost/shared_ptr.hpp" namespace qpid { -namespace asyncStore { -class AsyncStoreImpl; -} namespace broker { class AsyncResultQueue; -class TxnBuffer; +class AsyncStore; +class SimpleQueue; +class SimpleTxnBuffer; }} namespace tests { namespace storePerftools { namespace asyncPerf { -class SimpleQueue; class TestOptions; class MessageProducer @@ -47,18 +45,18 @@ class MessageProducer public: MessageProducer(const TestOptions& perfTestParams, const char* msgData, - qpid::asyncStore::AsyncStoreImpl* store, + qpid::broker::AsyncStore* store, qpid::broker::AsyncResultQueue& arq, - boost::shared_ptr queue); + boost::shared_ptr queue); virtual ~MessageProducer(); void* runProducers(); static void* startProducers(void* ptr); private: const TestOptions& m_perfTestParams; const char* m_msgData; - qpid::asyncStore::AsyncStoreImpl* m_store; + qpid::broker::AsyncStore* m_store; qpid::broker::AsyncResultQueue& m_resultQueue; - boost::shared_ptr m_queue; + boost::shared_ptr m_queue; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/Messages.h b/cpp/src/tests/storePerftools/asyncPerf/Messages.h deleted file mode 100644 index c1bfa328ea..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/Messages.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file Messages.h - */ - -/* - * This is a copy of qpid::broker::Messages.h, but using the local - * tests::storePerftools::asyncPerf::QueuedMessage class instead of - * qpid::broker::QueuedMessage. - */ - -#ifndef tests_storePerftools_asyncPerf_Messages_h_ -#define tests_storePerftools_asyncPerf_Messages_h_ - -#include -#include - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class QueuedMessage; - -class Messages -{ -public: - virtual ~Messages() {} - virtual uint32_t size() = 0; - virtual bool push(boost::shared_ptr& added) = 0; - virtual bool consume(boost::shared_ptr& msg) = 0; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_Messages_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp index 6377cc0d85..c05eb0487d 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp @@ -25,7 +25,6 @@ #include "MessageConsumer.h" #include "MessageProducer.h" -#include "SimpleQueue.h" #include "tests/storePerftools/version.h" #include "tests/storePerftools/common/ScopedTimer.h" @@ -34,6 +33,7 @@ #include "qpid/Modules.h" // Use with loading store as module #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/asyncStore/AsyncStoreOptions.h" +#include "qpid/broker/SimpleQueue.h" #include @@ -55,8 +55,7 @@ PerfTest::PerfTest(const TestOptions& to, std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize); } -PerfTest::~PerfTest() -{ +PerfTest::~PerfTest() { m_poller->shutdown(); m_pollingThread.join(); @@ -68,8 +67,7 @@ PerfTest::~PerfTest() } void -PerfTest::run() -{ +PerfTest::run() { if (m_testOpts.m_durable) { prepareStore(); } @@ -113,8 +111,7 @@ PerfTest::run() } void -PerfTest::toStream(std::ostream& os) const -{ +PerfTest::toStream(std::ostream& os) const { m_testOpts.printVals(os); os << std::endl; m_storeOpts.printVals(os); @@ -124,16 +121,15 @@ PerfTest::toStream(std::ostream& os) const // private void -PerfTest::prepareStore() -{ - m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts); - m_store->initialize(); +PerfTest::prepareStore() { + qpid::asyncStore::AsyncStoreImpl* s = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts); + s->initialize(); + m_store = s; } // private void -PerfTest::destroyStore() -{ +PerfTest::destroyStore() { if (m_store) { delete m_store; } @@ -141,12 +137,11 @@ PerfTest::destroyStore() // private void -PerfTest::prepareQueues() -{ +PerfTest::prepareQueues() { for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) { std::ostringstream qname; qname << "queue_" << std::setw(4) << std::setfill('0') << i; - boost::shared_ptr mpq(new SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); + boost::shared_ptr mpq(new qpid::broker::SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue)); mpq->asyncCreate(); m_queueList.push_back(mpq); } @@ -154,8 +149,7 @@ PerfTest::prepareQueues() // private void -PerfTest::destroyQueues() -{ +PerfTest::destroyQueues() { while (m_queueList.size() > 0) { m_queueList.front()->asyncDestroy(m_testOpts.m_destroyQueuesOnCompletion); m_queueList.pop_front(); @@ -163,8 +157,7 @@ PerfTest::destroyQueues() } int -runPerfTest(int argc, char** argv) -{ +runPerfTest(int argc, char** argv) { // Load async store module qpid::tryShlib ("asyncStore.so", false); @@ -212,7 +205,6 @@ runPerfTest(int argc, char** argv) // ----------------------------------------------------------------- int -main(int argc, char** argv) -{ +main(int argc, char** argv) { return tests::storePerftools::asyncPerf::runPerfTest(argc, argv); } diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h index e4d99021b5..27d8d08faf 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h +++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h @@ -37,9 +37,11 @@ namespace qpid { namespace asyncStore { -class AsyncStoreImpl; class AsyncStoreOptions; } +namespace broker { +class SimpleQueue; +} namespace sys { class Poller; }} @@ -48,7 +50,6 @@ namespace tests { namespace storePerftools { namespace asyncPerf { -class SimpleQueue; class MessageConsumer; class MessageProducer; class TestOptions; @@ -73,8 +74,8 @@ private: boost::shared_ptr m_poller; qpid::sys::Thread m_pollingThread; qpid::broker::AsyncResultQueueImpl m_resultQueue; - qpid::asyncStore::AsyncStoreImpl* m_store; - std::deque > m_queueList; + qpid::broker::AsyncStore* m_store; + std::deque > m_queueList; std::deque > m_producers; std::deque > m_consumers; diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp deleted file mode 100644 index 0d16248c7f..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file QueuedMessage.cpp - */ - -#include "QueuedMessage.h" - -#include "SimpleMessage.h" -#include "SimpleQueue.h" - -#include "qpid/asyncStore/AsyncStoreImpl.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -QueuedMessage::QueuedMessage() : - m_queue(0) -{} - -QueuedMessage::QueuedMessage(SimpleQueue* q, - boost::intrusive_ptr msg) : - boost::enable_shared_from_this(), - m_queue(q), - m_msg(msg) -{ - if (m_queue->getStore()) { - m_enqHandle = q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()); - } -} - -QueuedMessage::QueuedMessage(const QueuedMessage& qm) : - boost::enable_shared_from_this(), - m_queue(qm.m_queue), - m_msg(qm.m_msg), - m_enqHandle(qm.m_enqHandle) -{} - -QueuedMessage::QueuedMessage(QueuedMessage* const qm) : - boost::enable_shared_from_this(), - m_queue(qm->m_queue), - m_msg(qm->m_msg), - m_enqHandle(qm->m_enqHandle) -{} - -QueuedMessage::~QueuedMessage() -{} - -SimpleQueue* -QueuedMessage::getQueue() const -{ - return m_queue; -} - -boost::intrusive_ptr -QueuedMessage::payload() const -{ - return m_msg; -} - -const qpid::broker::EnqueueHandle& -QueuedMessage::enqHandle() const -{ - return m_enqHandle; -} - -qpid::broker::EnqueueHandle& -QueuedMessage::enqHandle() -{ - return m_enqHandle; -} - -void -QueuedMessage::prepareEnqueue(qpid::broker::TxnBuffer* tb) -{ - m_queue->enqueue(tb, shared_from_this()); -} - -void -QueuedMessage::commitEnqueue() -{ - m_queue->process(m_msg); -} - -void -QueuedMessage::abortEnqueue() -{ - m_queue->enqueueAborted(m_msg); -} - -}}} // namespace tests::storePerfTools diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h deleted file mode 100644 index 630fe1aedc..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file QueuedMessage.h - */ - -#ifndef tests_storePerftools_asyncPerf_QueuedMessage_h_ -#define tests_storePerftools_asyncPerf_QueuedMessage_h_ - -#include "qpid/broker/AsyncStore.h" -#include "qpid/broker/EnqueueHandle.h" - -#include -#include - -namespace qpid { -namespace broker { - -class TxnHandle; - -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class SimpleMessage; -class SimpleQueue; - -class QueuedMessage : public boost::enable_shared_from_this -{ -public: - QueuedMessage(); - QueuedMessage(SimpleQueue* q, - boost::intrusive_ptr msg); - QueuedMessage(const QueuedMessage& qm); - QueuedMessage(QueuedMessage* const qm); - virtual ~QueuedMessage(); - SimpleQueue* getQueue() const; - boost::intrusive_ptr payload() const; - const qpid::broker::EnqueueHandle& enqHandle() const; - qpid::broker::EnqueueHandle& enqHandle(); - - // --- Transaction handling --- - void prepareEnqueue(qpid::broker::TxnBuffer* tb); - void commitEnqueue(); - void abortEnqueue(); - -private: - SimpleQueue* m_queue; - boost::intrusive_ptr m_msg; - qpid::broker::EnqueueHandle m_enqHandle; -}; - -}}} // namespace tests::storePerfTools - -#endif // tests_storePerftools_asyncPerf_QueuedMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp deleted file mode 100644 index bacf438b9f..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimpleMessage.cpp - */ - -#include "SimpleMessage.h" - -#include // memcpy() - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -SimpleMessage::SimpleMessage(const char* msgData, - const uint32_t msgSize) : - m_persistenceId(0ULL), - m_msg(msgData, static_cast(msgSize)), - m_store(0), - m_msgHandle(qpid::broker::MessageHandle()) -{} - -SimpleMessage::SimpleMessage(const char* msgData, - const uint32_t msgSize, - qpid::broker::AsyncStore* store) : - m_persistenceId(0ULL), - m_msg(msgData, static_cast(msgSize)), - m_store(store), - m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle()) -{} - -SimpleMessage::~SimpleMessage() -{} - -const qpid::broker::MessageHandle& -SimpleMessage::getHandle() const -{ - return m_msgHandle; -} - -qpid::broker::MessageHandle& -SimpleMessage::getHandle() -{ - return m_msgHandle; -} - -uint64_t -SimpleMessage::contentSize() const -{ - return static_cast(m_msg.size()); -} - -void -SimpleMessage::setPersistenceId(uint64_t id) const -{ - m_persistenceId = id; -} - -uint64_t -SimpleMessage::getPersistenceId() const -{ - return m_persistenceId; -} - -void -SimpleMessage::encode(qpid::framing::Buffer& buffer) const -{ - buffer.putRawData(m_msg); -} - -uint32_t -SimpleMessage::encodedSize() const -{ - return static_cast(m_msg.size()); -} - -void -SimpleMessage::allDequeuesComplete() -{} - -uint32_t -SimpleMessage::encodedHeaderSize() const -{ - return 0; -} - -bool -SimpleMessage::isPersistent() const -{ - return m_store != 0; -} - -uint64_t -SimpleMessage::getSize() -{ - return m_msg.size(); -} - -void -SimpleMessage::write(char* target) -{ - ::memcpy(target, m_msg.data(), m_msg.size()); -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h deleted file mode 100644 index 169f5a8959..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimpleMessage.h - */ - -#ifndef tests_storePerftools_asyncPerf_SimpleMessage_h_ -#define tests_storePerftools_asyncPerf_SimpleMessage_h_ - -#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource -#include "qpid/broker/MessageHandle.h" -#include "qpid/broker/PersistableMessage.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class SimpleMessage: public qpid::broker::PersistableMessage, - public qpid::broker::DataSource -{ -public: - SimpleMessage(const char* msgData, - const uint32_t msgSize); - SimpleMessage(const char* msgData, - const uint32_t msgSize, - qpid::broker::AsyncStore* store); - virtual ~SimpleMessage(); - const qpid::broker::MessageHandle& getHandle() const; - qpid::broker::MessageHandle& getHandle(); - uint64_t contentSize() const; - - // --- Interface Persistable --- - virtual void setPersistenceId(uint64_t id) const; - virtual uint64_t getPersistenceId() const; - virtual void encode(qpid::framing::Buffer& buffer) const; - virtual uint32_t encodedSize() const; - - // --- Interface PersistableMessage --- - virtual void allDequeuesComplete(); - virtual uint32_t encodedHeaderSize() const; - virtual bool isPersistent() const; - - // --- Interface DataSource --- - virtual uint64_t getSize(); // <- same as encodedSize()? - virtual void write(char* target); - -private: - mutable uint64_t m_persistenceId; - const std::string m_msg; - qpid::broker::AsyncStore* m_store; - - qpid::broker::MessageHandle m_msgHandle; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_SimpleMessage_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp deleted file mode 100644 index 06b4e9333f..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ /dev/null @@ -1,457 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimpleQueue.cpp - */ - -#include "SimpleQueue.h" - -#include "DeliveryRecord.h" -#include "MessageConsumer.h" -#include "MessageDeque.h" -#include "QueuedMessage.h" -#include "SimpleMessage.h" - -#include "qpid/broker/AsyncResultHandle.h" -#include "qpid/broker/QueueAsyncContext.h" -#include "qpid/broker/TxnBuffer.h" - -#include // memcpy() - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -//static -qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations - - -SimpleQueue::SimpleQueue(const std::string& name, - const qpid::framing::FieldTable& /*args*/, - qpid::broker::AsyncStore* store, - qpid::broker::AsyncResultQueue& arq) : - qpid::broker::PersistableQueue(), - m_name(name), - m_store(store), - m_resultQueue(arq), - m_asyncOpCounter(0UL), - m_persistenceId(0ULL), - m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. - m_destroyPending(false), - m_destroyed(false), - m_barrier(*this), - m_messages(new MessageDeque()) -{ - if (m_store != 0) { - const qpid::types::Variant::Map qo; - m_queueHandle = m_store->createQueueHandle(m_name, qo); - } -} - -SimpleQueue::~SimpleQueue() {} - -const qpid::broker::QueueHandle& -SimpleQueue::getHandle() const { - return m_queueHandle; -} - -qpid::broker::QueueHandle& -SimpleQueue::getHandle() { - return m_queueHandle; -} - -qpid::broker::AsyncStore* -SimpleQueue::getStore() { - return m_store; -} - -void -SimpleQueue::asyncCreate() { - if (m_store) { - boost::shared_ptr qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - &handleAsyncCreateResult, - &m_resultQueue)); - m_store->submitCreate(m_queueHandle, this, qac); - ++m_asyncOpCounter; - } -} - -//static -void -SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh) { - if (arh) { - boost::shared_ptr qc = - boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); - boost::shared_ptr sq = boost::dynamic_pointer_cast(qc->getQueue()); - if (arh->getErrNo()) { - // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; - } else { - sq->createComplete(qc); - } - } -} - -void -SimpleQueue::asyncDestroy(const bool deleteQueue) -{ - m_destroyPending = true; - if (m_store) { - if (deleteQueue) { - boost::shared_ptr qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - &handleAsyncDestroyResult, - &m_resultQueue)); - m_store->submitDestroy(m_queueHandle, qac); - ++m_asyncOpCounter; - } - m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); - } -} - -//static -void -SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh) { - if (arh) { - boost::shared_ptr qc = - boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); - boost::shared_ptr sq = boost::dynamic_pointer_cast(qc->getQueue()); - if (arh->getErrNo()) { - // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; - } else { - sq->destroyComplete(qc); - } - } -} - -void -SimpleQueue::deliver(boost::intrusive_ptr msg) { - boost::shared_ptr qm(boost::shared_ptr(new QueuedMessage(this, msg))); - enqueue(qm); - push(qm); -} - -bool -SimpleQueue::dispatch(MessageConsumer& mc) { - boost::shared_ptr qm; - if (m_messages->consume(qm)) { - boost::shared_ptr dr(new DeliveryRecord(qm, mc, false)); - mc.record(dr); - return true; - } - return false; -} - -bool -SimpleQueue::enqueue(boost::shared_ptr qm) { - return enqueue(0, qm); -} - -bool -SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr qm) { - ScopedUse u(m_barrier); - if (!u.m_acquired) { - return false; - } - if (qm->payload()->isPersistent() && m_store) { - qm->payload()->enqueueAsync(shared_from_this(), m_store); - return asyncEnqueue(tb, qm); - } - return false; -} - -bool -SimpleQueue::dequeue(boost::shared_ptr qm) { - return dequeue(0, qm); -} - -bool -SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr qm) { - ScopedUse u(m_barrier); - if (!u.m_acquired) { - return false; - } - if (qm->payload()->isPersistent() && m_store) { - qm->payload()->dequeueAsync(shared_from_this(), m_store); - return asyncDequeue(tb, qm); - } - return true; -} - -void -SimpleQueue::process(boost::intrusive_ptr msg) { - push(boost::shared_ptr(new QueuedMessage(this, msg))); -} - -void -SimpleQueue::enqueueAborted(boost::intrusive_ptr) {} - -void -SimpleQueue::encode(qpid::framing::Buffer& buffer) const { - buffer.putShortString(m_name); -} - -uint32_t -SimpleQueue::encodedSize() const { - return m_name.size() + 1; -} - -uint64_t -SimpleQueue::getPersistenceId() const { - return m_persistenceId; -} - -void -SimpleQueue::setPersistenceId(uint64_t persistenceId) const { - m_persistenceId = persistenceId; -} - -void -SimpleQueue::flush() { - //if(m_store) m_store->flush(*this); -} - -const std::string& -SimpleQueue::getName() const { - return m_name; -} - -void -SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) { - if (externalQueueStore != inst && externalQueueStore) - delete externalQueueStore; - externalQueueStore = inst; -} - -uint64_t -SimpleQueue::getSize() { - return m_persistableData.size(); -} - -void -SimpleQueue::write(char* target) { - ::memcpy(target, m_persistableData.data(), m_persistableData.size()); -} - -// --- Members & methods in msg handling path from qpid::Queue --- - -// protected -SimpleQueue::UsageBarrier::UsageBarrier(SimpleQueue& q) : - m_parent(q), - m_count(0) -{} - -// protected -bool -SimpleQueue::UsageBarrier::acquire() -{ - qpid::sys::Monitor::ScopedLock l(m_monitor); - if (m_parent.m_destroyed) { - return false; - } else { - ++m_count; - return true; - } -} - -// protected -void SimpleQueue::UsageBarrier::release() -{ - qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); - if (--m_count == 0) { - m_monitor.notifyAll(); - } -} - -// protected -void SimpleQueue::UsageBarrier::destroy() -{ - qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); - m_parent.m_destroyed = true; - while (m_count) { - m_monitor.wait(); - } -} - -// protected -SimpleQueue::ScopedUse::ScopedUse(UsageBarrier& b) : - m_barrier(b), - m_acquired(m_barrier.acquire()) -{} - -// protected -SimpleQueue::ScopedUse::~ScopedUse() -{ - if (m_acquired) { - m_barrier.release(); - } -} - -// private -void -SimpleQueue::push(boost::shared_ptr qm, - bool /*isRecovery*/) -{ - m_messages->push(qm); -} - -// --- End Members & methods in msg handling path from qpid::Queue --- - -// private -bool -SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr qm) { - assert(qm.get()); - boost::shared_ptr qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - qm->payload(), - tb, - &handleAsyncEnqueueResult, - &m_resultQueue)); - if (tb) { - tb->incrOpCnt(); - m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac); - } else { - m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac); - } - ++m_asyncOpCounter; - return true; -} - -// private static -void -SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh) { - if (arh) { - boost::shared_ptr qc = - boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); - boost::shared_ptr sq = boost::dynamic_pointer_cast(qc->getQueue()); - if (arh->getErrNo()) { - // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; - } else { - sq->enqueueComplete(qc); - } - } -} - -// private -bool -SimpleQueue::asyncDequeue(/*boost::shared_ptr*/qpid::broker::TxnBuffer* tb, - boost::shared_ptr qm) -{ - assert(qm.get()); - boost::shared_ptr qac(new qpid::broker::QueueAsyncContext(shared_from_this(), - qm->payload(), - tb, - &handleAsyncDequeueResult, - &m_resultQueue)); - if (tb) { - tb->incrOpCnt(); - m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac); - } else { - m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac); - } - ++m_asyncOpCounter; - return true; -} -// private static -void -SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh) { - if (arh) { - boost::shared_ptr qc = - boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); - boost::shared_ptr sq = boost::dynamic_pointer_cast(qc->getQueue()); - if (arh->getErrNo()) { - // TODO: Handle async failure here (other than by simply printing a message) - std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " - << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; - } else { - sq->dequeueComplete(qc); - } - } -} - -// private -void -SimpleQueue::destroyCheck(const std::string& opDescr) const { - if (m_destroyPending || m_destroyed) { - std::ostringstream oss; - oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; - throw qpid::Exception(oss.str()); - } -} - -// private -void -SimpleQueue::createComplete(const boost::shared_ptr qc) { - if (qc.get()) { - assert(qc->getQueue().get() == this); - } - --m_asyncOpCounter; -} - -// private -void -SimpleQueue::flushComplete(const boost::shared_ptr qc) { - if (qc.get()) { - assert(qc->getQueue().get() == this); - } - --m_asyncOpCounter; -} - -// private -void -SimpleQueue::destroyComplete(const boost::shared_ptr qc) { - if (qc.get()) { - assert(qc->getQueue().get() == this); - } - --m_asyncOpCounter; - m_destroyed = true; -} - -// private -void -SimpleQueue::enqueueComplete(const boost::shared_ptr qc) { - if (qc.get()) { - assert(qc->getQueue().get() == this); - if (qc->getTxnBuffer()) { // transactional enqueue - qc->getTxnBuffer()->decrOpCnt(); - } - } - --m_asyncOpCounter; -} - -// private -void -SimpleQueue::dequeueComplete(const boost::shared_ptr qc) { - if (qc.get()) { - assert(qc->getQueue().get() == this); - if (qc->getTxnBuffer()) { // transactional enqueue - qc->getTxnBuffer()->decrOpCnt(); - } - } - --m_asyncOpCounter; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h deleted file mode 100644 index 5f64c9b960..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file SimpleQueue.h - */ - -#ifndef tests_storePerftools_asyncPerf_SimpleQueue_h_ -#define tests_storePerftools_asyncPerf_SimpleQueue_h_ - -#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter -#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource -#include "qpid/broker/PersistableQueue.h" -#include "qpid/broker/QueueHandle.h" -#include "qpid/sys/Monitor.h" - -#include -#include -#include - -namespace qpid { -namespace broker { -class AsyncResultQueue; -class QueueAsyncContext; -class TxnBuffer; -} -namespace framing { -class FieldTable; -}} - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class MessageConsumer; -class Messages; -class QueuedMessage; -class SimpleMessage; - -class SimpleQueue : public boost::enable_shared_from_this, - public qpid::broker::PersistableQueue, - public qpid::broker::DataSource -{ -public: - SimpleQueue(const std::string& name, - const qpid::framing::FieldTable& args, - qpid::broker::AsyncStore* store, - qpid::broker::AsyncResultQueue& arq); - virtual ~SimpleQueue(); - - const qpid::broker::QueueHandle& getHandle() const; - qpid::broker::QueueHandle& getHandle(); - qpid::broker::AsyncStore* getStore(); - - void asyncCreate(); - static void handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh); - void asyncDestroy(const bool deleteQueue); - static void handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh); - - // --- Methods in msg handling path from qpid::Queue --- - void deliver(boost::intrusive_ptr msg); - bool dispatch(MessageConsumer& mc); - bool enqueue(boost::shared_ptr qm); - bool enqueue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr qm); - bool dequeue(boost::shared_ptr qm); - bool dequeue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr qm); - void process(boost::intrusive_ptr msg); - void enqueueAborted(boost::intrusive_ptr msg); - - // --- Interface qpid::broker::Persistable --- - virtual void encode(qpid::framing::Buffer& buffer) const; - virtual uint32_t encodedSize() const; - virtual uint64_t getPersistenceId() const; - virtual void setPersistenceId(uint64_t persistenceId) const; - - // --- Interface qpid::broker::PersistableQueue --- - virtual void flush(); - virtual const std::string& getName() const; - virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst); - - // --- Interface qpid::broker::DataStore --- - virtual uint64_t getSize(); - virtual void write(char* target); - -private: - static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations - - const std::string m_name; - qpid::broker::AsyncStore* m_store; - qpid::broker::AsyncResultQueue& m_resultQueue; - qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; // TODO: change this to non-async store counter! - mutable uint64_t m_persistenceId; - std::string m_persistableData; - qpid::broker::QueueHandle m_queueHandle; - bool m_destroyPending; - bool m_destroyed; - - // --- Members & methods in msg handling path copied from qpid::Queue --- - struct UsageBarrier { - SimpleQueue& m_parent; - uint32_t m_count; - qpid::sys::Monitor m_monitor; - UsageBarrier(SimpleQueue& q); - bool acquire(); - void release(); - void destroy(); - }; - struct ScopedUse { - UsageBarrier& m_barrier; - const bool m_acquired; - ScopedUse(UsageBarrier& b); - ~ScopedUse(); - }; - UsageBarrier m_barrier; - std::auto_ptr m_messages; - void push(boost::shared_ptr qm, - bool isRecovery = false); - - // -- Async ops --- - bool asyncEnqueue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr qm); - static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh); - bool asyncDequeue(qpid::broker::TxnBuffer* tb, - boost::shared_ptr qm); - static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh); - - // --- Async op counter --- - void destroyCheck(const std::string& opDescr) const; - - // --- Async op completions (called through handleAsyncResult) --- - void createComplete(const boost::shared_ptr qc); - void flushComplete(const boost::shared_ptr qc); - void destroyComplete(const boost::shared_ptr qc); - void enqueueComplete(const boost::shared_ptr qc); - void dequeueComplete(const boost::shared_ptr qc); -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_SimpleQueue_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp index 8c1f2976bf..20e9c39f1b 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp @@ -62,12 +62,10 @@ TestOptions::TestOptions(const uint32_t numMsgs, doAddOptions(); } -TestOptions::~TestOptions() -{} +TestOptions::~TestOptions() {} void -TestOptions::printVals(std::ostream& os) const -{ +TestOptions::printVals(std::ostream& os) const { tests::storePerftools::common::TestOptions::printVals(os); os << " Num enqueus per transaction [-t, --enq-txn-size]: " << m_enqTxnBlockSize << std::endl; os << " Num dequeues per transaction [-d, --deq-txn-size]: " << m_deqTxnBlockSize << std::endl; @@ -77,8 +75,7 @@ TestOptions::printVals(std::ostream& os) const // private void -TestOptions::doAddOptions() -{ +TestOptions::doAddOptions() { addOptions() ("enq-txn-size,t", qpid::optValue(m_enqTxnBlockSize, "N"), "Num enqueus per transaction (0 = no transactions)") diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp index cf6f293494..312fa187b8 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp @@ -32,12 +32,10 @@ TestResult::TestResult(const TestOptions& to) : m_testOpts(to) {} -TestResult::~TestResult() -{} +TestResult::~TestResult() {} void -TestResult::toStream(std::ostream& os) const -{ +TestResult::toStream(std::ostream& os) const { double msgsRate; os << "TEST RESULTS:" << std::endl; os << " Msgs per thread: " << m_testOpts.m_numMsgs << std::endl; diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp deleted file mode 100644 index 375cd568d2..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnAccept.cpp - */ - -#include "TxnAccept.h" - -#include "DeliveryRecord.h" - -#include "qpid/log/Statement.h" - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -TxnAccept::TxnAccept(std::deque >& ops) : - m_ops(ops) -{} - -TxnAccept::~TxnAccept() {} - -// --- Interface TxnOp --- - -bool -TxnAccept::prepare(qpid::broker::TxnBuffer* tb) throw() { - try { - for (std::deque >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { - (*i)->dequeue(tb); - } - return true; - } catch (const std::exception& e) { - QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what()); - } catch (...) { - QPID_LOG(error, "TxnAccept: Failed to prepare transaction: (unknown error)"); - } - return false; -} - -void -TxnAccept::commit() throw() { - try { - for (std::deque >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) { - (*i)->committed(); - (*i)->setEnded(); - } - } catch (const std::exception& e) { - QPID_LOG(error, "TxnAccept: Failed to commit transaction: " << e.what()); - } catch(...) { - QPID_LOG(error, "TxnAccept: Failed to commit transaction: (unknown error)"); - } -} - -void -TxnAccept::rollback() throw() {} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h deleted file mode 100644 index 5d84289965..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnAccept.h - */ - -#ifndef tests_storePerftools_asyncPerf_TxnAccept_h_ -#define tests_storePerftools_asyncPerf_TxnAccept_h_ - -#include "qpid/broker/TxnOp.h" - -#include "boost/shared_ptr.hpp" -#include - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class DeliveryRecord; - -class TxnAccept: public qpid::broker::TxnOp { -public: - TxnAccept(std::deque >& ops); - virtual ~TxnAccept(); - - // --- Interface TxnOp --- - bool prepare(qpid::broker::TxnBuffer* tb) throw(); - void commit() throw(); - void rollback() throw(); -private: - std::deque > m_ops; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_TxnAccept_h_ diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp deleted file mode 100644 index cc36a38be7..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnPublish.cpp - */ - -#include "TxnPublish.h" - -#include "QueuedMessage.h" -#include "SimpleMessage.h" -#include "SimpleQueue.h" - -#include "qpid/log/Statement.h" -#include - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -TxnPublish::TxnPublish(boost::intrusive_ptr msg) : - m_msg(msg) -{} - -TxnPublish::~TxnPublish() {} - -bool -TxnPublish::prepare(qpid::broker::TxnBuffer* tb) throw() { - try { - while (!m_queues.empty()) { - m_queues.front()->prepareEnqueue(tb); - m_prepared.push_back(m_queues.front()); - m_queues.pop_front(); - } - return true; - } catch (const std::exception& e) { - QPID_LOG(error, "TxnPublish: Failed to prepare transaction: " << e.what()); - } catch (...) { - QPID_LOG(error, "TxnPublish: Failed to prepare transaction: (unknown error)"); - } - return false; -} - -void -TxnPublish::commit() throw() { - try { - for (std::list >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { - (*i)->commitEnqueue(); - } - } catch (const std::exception& e) { - QPID_LOG(error, "TxnPublish: Failed to commit transaction: " << e.what()); - } catch (...) { - QPID_LOG(error, "TxnPublish: Failed to commit transaction: (unknown error)"); - } -} - -void -TxnPublish::rollback() throw() { - try { - for (std::list >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { - (*i)->abortEnqueue(); - } - } catch (const std::exception& e) { - QPID_LOG(error, "TxnPublish: Failed to rollback transaction: " << e.what()); - } catch (...) { - QPID_LOG(error, "TxnPublish: Failed to rollback transaction: (unknown error)"); - } -} - -uint64_t -TxnPublish::contentSize() { - return m_msg->contentSize(); -} - -void -TxnPublish::deliverTo(const boost::shared_ptr& queue) { - m_queues.push_back(boost::shared_ptr(new QueuedMessage(queue.get(), m_msg))); - m_delivered = true; -} - -SimpleMessage& -TxnPublish::getMessage() { - return *m_msg; -} - -}}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h deleted file mode 100644 index eae9ef9c4c..0000000000 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnPublish.h - */ - -#ifndef tests_storePerftools_asyncPerf_TxnPublish_h_ -#define tests_storePerftools_asyncPerf_TxnPublish_h_ - -#include "Deliverable.h" - -#include "qpid/broker/TxnOp.h" - -#include -#include -#include - -namespace tests { -namespace storePerftools { -namespace asyncPerf { - -class QueuedMessage; -class SimpleMessage; -class SimpleQueue; - -class TxnPublish : public qpid::broker::TxnOp, - public Deliverable -{ -public: - TxnPublish(boost::intrusive_ptr msg); - virtual ~TxnPublish(); - - // --- Interface TxOp --- - bool prepare(qpid::broker::TxnBuffer* tb) throw(); - void commit() throw(); - void rollback() throw(); - - // --- Interface Deliverable --- - uint64_t contentSize(); - void deliverTo(const boost::shared_ptr& queue); - SimpleMessage& getMessage(); - -private: - boost::intrusive_ptr m_msg; - std::list > m_queues; - std::list > m_prepared; -}; - -}}} // namespace tests::storePerftools::asyncPerf - -#endif // tests_storePerftools_asyncPerf_TxnPublish_h_ -- cgit v1.2.1