summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-01 14:05:21 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-01 14:05:21 +0000
commit80bfab9ed823cebd9f8f58b559fd32df108bcf7d (patch)
tree191bf724b9bf5b8394343d60ac4eac804e9c3d3a /cpp/src/tests
parent63c6598f401ac6406e5a31c602c7892b798536fc (diff)
downloadqpid-python-80bfab9ed823cebd9f8f58b559fd32df108bcf7d.tar.gz
QPID-3858: WIP: Moving Simple* test classes into the correct namespaces so as to correspond with broker classes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368006 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/asyncstore.cmake18
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp43
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/Deliverable.h54
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp102
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h66
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp64
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h56
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp25
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h24
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp65
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h59
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp23
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h16
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/Messages.h53
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp36
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.h9
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp109
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h74
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp122
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h74
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp457
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h158
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp9
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp74
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h53
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp102
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h68
28 files changed, 75 insertions, 1944 deletions
diff --git a/cpp/src/tests/asyncstore.cmake b/cpp/src/tests/asyncstore.cmake
index 94631bb8ea..3dcd81c3d7 100644
--- a/cpp/src/tests/asyncstore.cmake
+++ b/cpp/src/tests/asyncstore.cmake
@@ -51,20 +51,20 @@ endif (UNIX)
# Async store perf test (asyncPerf)
set (asyncStorePerf_SOURCES
- storePerftools/asyncPerf/Deliverable.cpp
- storePerftools/asyncPerf/DeliveryRecord.cpp
- storePerftools/asyncPerf/MessageAsyncContext.cpp
storePerftools/asyncPerf/MessageConsumer.cpp
- storePerftools/asyncPerf/MessageDeque.cpp
storePerftools/asyncPerf/MessageProducer.cpp
storePerftools/asyncPerf/PerfTest.cpp
- storePerftools/asyncPerf/QueuedMessage.cpp
- storePerftools/asyncPerf/SimpleMessage.cpp
- storePerftools/asyncPerf/SimpleQueue.cpp
+# storePerftools/asyncPerf/SimpleDeliverable.cpp
+# storePerftools/asyncPerf/SimpleDeliveryRecord.cpp
+# storePerftools/asyncPerf/SimpleMessage.cpp
+# storePerftools/asyncPerf/SimpleMessageAsyncContext.cpp
+# storePerftools/asyncPerf/SimpleMessageDeque.cpp
+# storePerftools/asyncPerf/SimpleQueue.cpp
+# storePerftools/asyncPerf/SimpleQueuedMessage.cpp
+# storePerftools/asyncPerf/SimpleTxnAccept.cpp
+# storePerftools/asyncPerf/SimpleTxnPublish.cpp
storePerftools/asyncPerf/TestOptions.cpp
storePerftools/asyncPerf/TestResult.cpp
- storePerftools/asyncPerf/TxnAccept.cpp
- storePerftools/asyncPerf/TxnPublish.cpp
storePerftools/common/Parameters.cpp
storePerftools/common/PerftoolError.cpp
diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp
deleted file mode 100644
index 9da7e348e0..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file Deliverable.cpp
- */
-
-#include "Deliverable.h"
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-Deliverable::Deliverable() :
- m_delivered(false)
-{}
-
-Deliverable::~Deliverable()
-{}
-
-bool
-Deliverable::isDelivered() const
-{
- return m_delivered;
-}
-
-}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h
deleted file mode 100644
index 990d53a199..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file Deliverable.h
- */
-
-#ifndef tests_storePerftools_asyncPerf_Deliverable_h_
-#define tests_storePerftools_asyncPerf_Deliverable_h_
-
-#include <boost/shared_ptr.hpp>
-#include <stdint.h> // uint64_t
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-class SimpleMessage;
-class SimpleQueue;
-
-class Deliverable
-{
-public:
- Deliverable();
- virtual ~Deliverable();
-
- virtual uint64_t contentSize() = 0;
- virtual void deliverTo(const boost::shared_ptr<SimpleQueue>& queue) = 0;
- virtual SimpleMessage& getMessage() = 0;
- virtual bool isDelivered() const;
-
-protected:
- bool m_delivered;
-};
-
-}}} // namespace tests::storePerftools::asyncPerf
-
-#endif // tests_storePerftools_asyncPerf_Deliverable_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
deleted file mode 100644
index 6f33369a26..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file DeliveryRecord.cpp
- */
-
-#include "DeliveryRecord.h"
-
-#include "MessageConsumer.h"
-#include "QueuedMessage.h"
-#include "SimpleMessage.h"
-#include "SimpleQueue.h"
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-DeliveryRecord::DeliveryRecord(boost::shared_ptr<QueuedMessage> qm,
- MessageConsumer& mc,
- bool accepted) :
- m_queuedMessage(qm),
- m_msgConsumer(mc),
- m_accepted(accepted),
- m_ended(accepted)
-{}
-
-DeliveryRecord::~DeliveryRecord()
-{}
-
-bool
-DeliveryRecord::accept()
-{
- if (!m_ended) {
- m_queuedMessage->getQueue()->dequeue(m_queuedMessage);
- m_accepted = true;
- setEnded();
- }
- return isRedundant();
-}
-
-bool
-DeliveryRecord::isAccepted() const
-{
- return m_accepted;
-}
-
-bool
-DeliveryRecord::setEnded()
-{
- m_ended = true;
- m_queuedMessage->payload() = boost::intrusive_ptr<SimpleMessage>(0);
- return isRedundant();
-}
-
-bool
-DeliveryRecord::isEnded() const
-{
- return m_ended;
-}
-
-bool
-DeliveryRecord::isRedundant() const
-{
- return m_ended;
-}
-
-void
-DeliveryRecord::dequeue(qpid::broker::TxnBuffer* tb)
-{
- m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage);
-}
-
-void
-DeliveryRecord::committed() const
-{
- m_msgConsumer.commitComplete();
-}
-
-boost::shared_ptr<QueuedMessage>
-DeliveryRecord::getQueuedMessage() const
-{
- return m_queuedMessage;
-}
-
-}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
deleted file mode 100644
index 6c5d87f374..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file DeliveryRecord.h
- */
-
-#ifndef tests_storePerftools_asyncPerf_DeliveryRecord_h_
-#define tests_storePerftools_asyncPerf_DeliveryRecord_h_
-
-//#include "QueuedMessage.h"
-
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-namespace broker {
-class TxnBuffer;
-}}
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-class MessageConsumer;
-class QueuedMessage;
-
-class DeliveryRecord {
-public:
- DeliveryRecord(boost::shared_ptr<QueuedMessage> qm,
- MessageConsumer& mc,
- bool accepted);
- virtual ~DeliveryRecord();
- bool accept();
- bool isAccepted() const;
- bool setEnded();
- bool isEnded() const;
- bool isRedundant() const;
- void dequeue(qpid::broker::TxnBuffer* tb);
- void committed() const;
- boost::shared_ptr<QueuedMessage> getQueuedMessage() const;
-private:
- boost::shared_ptr<QueuedMessage> m_queuedMessage;
- MessageConsumer& m_msgConsumer;
- bool m_accepted : 1;
- bool m_ended : 1;
-};
-
-}}} // namespace tests::storePerftools::asyncPerf
-
-#endif // tests_storePerftools_asyncPerf_DeliveryRecord_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
deleted file mode 100644
index e3bfe9ae7a..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file MessageContext.cpp
- */
-
-#include "MessageAsyncContext.h"
-
-#include "SimpleMessage.h"
-
-#include <cassert>
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
- boost::shared_ptr<SimpleQueue> q) :
- m_msg(msg),
- m_q(q)
-{
- assert(m_msg.get() != 0);
- assert(m_q.get() != 0);
-}
-
-MessageAsyncContext::~MessageAsyncContext()
-{}
-
-boost::intrusive_ptr<SimpleMessage>
-MessageAsyncContext::getMessage() const
-{
- return m_msg;
-}
-
-boost::shared_ptr<SimpleQueue>
-MessageAsyncContext::getQueue() const
-{
- return m_q;
-}
-
-void
-MessageAsyncContext::destroy()
-{
- delete this;
-}
-
-}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
deleted file mode 100644
index 9252fbda45..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file MessageContext.h
- */
-
-#ifndef tests_storePerfTools_asyncPerf_MessageContext_h_
-#define tests_storePerfTools_asyncPerf_MessageContext_h_
-
-#include "qpid/asyncStore/AsyncOperation.h"
-
-#include <boost/intrusive_ptr.hpp>
-#include <boost/shared_ptr.hpp>
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-class SimpleMessage;
-class SimpleQueue;
-
-class MessageAsyncContext : public qpid::broker::BrokerAsyncContext
-{
-public:
- MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
- boost::shared_ptr<SimpleQueue> q);
- virtual ~MessageAsyncContext();
- boost::intrusive_ptr<SimpleMessage> getMessage() const;
- boost::shared_ptr<SimpleQueue> getQueue() const;
- void destroy();
-
-private:
- boost::intrusive_ptr<SimpleMessage> m_msg;
- boost::shared_ptr<SimpleQueue> m_q;
-};
-
-}}} // namespace tests::storePerftools::asyncPerf
-
-#endif // tests_storePerfTools_asyncPerf_MessageContext_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 6aa477c470..6477696bd6 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -23,13 +23,12 @@
#include "MessageConsumer.h"
-#include "DeliveryRecord.h"
-#include "SimpleQueue.h"
#include "TestOptions.h"
-#include "TxnAccept.h"
-#include "qpid/asyncStore/AsyncStoreImpl.h"
-#include "qpid/broker/TxnBuffer.h"
+#include "qpid/broker/SimpleDeliveryRecord.h"
+#include "qpid/broker/SimpleQueue.h"
+#include "qpid/broker/SimpleTxnAccept.h"
+#include "qpid/broker/SimpleTxnBuffer.h"
#include <stdint.h> // uint32_t
@@ -38,9 +37,9 @@ namespace storePerftools {
namespace asyncPerf {
MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq,
- boost::shared_ptr<SimpleQueue> queue) :
+ boost::shared_ptr<qpid::broker::SimpleQueue> queue) :
m_perfTestParams(perfTestParams),
m_store(store),
m_resultQueue(arq),
@@ -50,7 +49,7 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
MessageConsumer::~MessageConsumer() {}
void
-MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) {
+MessageConsumer::record(boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> dr) {
m_unacked.push_back(dr);
}
@@ -61,9 +60,9 @@ void*
MessageConsumer::runConsumers() {
const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
uint16_t opsInTxnCnt = 0U;
- qpid::broker::TxnBuffer* tb = 0;
+ qpid::broker::SimpleTxnBuffer* tb = 0;
if (useTxns) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue);
}
uint32_t msgsPerConsumer = m_perfTestParams.m_numEnqThreadsPerQueue * m_perfTestParams.m_numMsgs /
@@ -74,19 +73,19 @@ MessageConsumer::runConsumers() {
++numMsgs;
if (useTxns) {
// --- Transactional dequeue ---
- boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
+ boost::shared_ptr<qpid::broker::SimpleTxnAccept> ta(new qpid::broker::SimpleTxnAccept(m_unacked));
m_unacked.clear();
tb->enlist(ta);
if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) {
tb->commitLocal(m_store);
if (numMsgs < m_perfTestParams.m_numMsgs) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue);
}
opsInTxnCnt = 0U;
}
} else {
// --- Non-transactional dequeue ---
- for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) {
+ for (std::deque<boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) {
(*i)->accept();
}
m_unacked.clear();
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
index b110520889..d5a881f7e0 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
@@ -24,44 +24,44 @@
#ifndef tests_storePerftools_asyncPerf_MessageConsumer_h_
#define tests_storePerftools_asyncPerf_MessageConsumer_h_
+#include "qpid/broker/SimpleConsumer.h"
+
#include "boost/shared_ptr.hpp"
#include <deque>
namespace qpid {
-namespace asyncStore {
-class AsyncStoreImpl;
-}
namespace broker {
class AsyncResultQueue;
+class AsyncStore;
+class SimpleDeliveryRecord;
+class SimpleQueue;
}}
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class DeliveryRecord;
-class SimpleQueue;
class TestOptions;
-class MessageConsumer
+class MessageConsumer: public qpid::broker::SimpleConsumer
{
public:
MessageConsumer(const TestOptions& perfTestParams,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq,
- boost::shared_ptr<SimpleQueue> queue);
+ boost::shared_ptr<qpid::broker::SimpleQueue> queue);
virtual ~MessageConsumer();
- void record(boost::shared_ptr<DeliveryRecord> dr);
+ void record(boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> dr);
void commitComplete();
void* runConsumers();
static void* startConsumers(void* ptr);
private:
const TestOptions& m_perfTestParams;
- qpid::asyncStore::AsyncStoreImpl* m_store;
+ qpid::broker::AsyncStore* m_store;
qpid::broker::AsyncResultQueue& m_resultQueue;
- boost::shared_ptr<SimpleQueue> m_queue;
- std::deque<boost::shared_ptr<DeliveryRecord> > m_unacked;
+ boost::shared_ptr<qpid::broker::SimpleQueue> m_queue;
+ std::deque<boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> > m_unacked;
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp
deleted file mode 100644
index 1fa2c087ac..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file MessageDeque.cpp
- */
-
-#include "MessageDeque.h"
-
-#include "QueuedMessage.h"
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-MessageDeque::MessageDeque()
-{}
-
-MessageDeque::~MessageDeque()
-{}
-
-uint32_t
-MessageDeque::size()
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
- return m_messages.size();
-}
-
-bool
-MessageDeque::push(boost::shared_ptr<QueuedMessage>& added)
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
- m_messages.push_back(added);
- return false;
-}
-
-bool
-MessageDeque::consume(boost::shared_ptr<QueuedMessage>& msg)
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
- if (!m_messages.empty()) {
- msg = m_messages.front();
- m_messages.pop_front();
- return true;
- }
- return false;
-}
-
-}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h
deleted file mode 100644
index 021015f3e0..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file MessageDeque.h
- */
-
-/*
- * This is a copy of qpid::broker::MessageDeque.h, but using the local
- * tests::storePerftools::asyncPerf::QueuedMessage class instead of
- * qpid::broker::QueuedMessage.
- */
-
-#ifndef tests_storePerftools_asyncPerf_MessageDeque_h_
-#define tests_storePerftools_asyncPerf_MessageDeque_h_
-
-#include "Messages.h"
-
-#include "qpid/sys/Mutex.h"
-
-#include <deque>
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-class MessageDeque : public Messages
-{
-public:
- MessageDeque();
- virtual ~MessageDeque();
- uint32_t size();
- bool push(boost::shared_ptr<QueuedMessage>& added);
- bool consume(boost::shared_ptr<QueuedMessage>& msg);
-private:
- std::deque<boost::shared_ptr<QueuedMessage> > m_messages;
- qpid::sys::Mutex m_msgMutex;
-
-};
-
-}}} // namespace tests::storePerftools::asyncPerf
-
-#endif // tests_storePerftools_asyncPerf_MessageDeque_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
index 974f3f3981..f88d305a38 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
@@ -23,13 +23,12 @@
#include "MessageProducer.h"
-#include "SimpleMessage.h"
-#include "SimpleQueue.h"
#include "TestOptions.h"
-#include "TxnPublish.h"
-#include "qpid/asyncStore/AsyncStoreImpl.h"
-#include "qpid/broker/TxnBuffer.h"
+#include "qpid/broker/SimpleMessage.h"
+#include "qpid/broker/SimpleQueue.h"
+#include "qpid/broker/SimpleTxnBuffer.h"
+#include "qpid/broker/SimpleTxnPublish.h"
#include <stdint.h> // uint32_t
@@ -39,9 +38,9 @@ namespace asyncPerf {
MessageProducer::MessageProducer(const TestOptions& perfTestParams,
const char* msgData,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq,
- boost::shared_ptr<SimpleQueue> queue) :
+ boost::shared_ptr<qpid::broker::SimpleQueue> queue) :
m_perfTestParams(perfTestParams),
m_msgData(msgData),
m_store(store),
@@ -55,14 +54,14 @@ void*
MessageProducer::runProducers() {
const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U;
uint16_t recsInTxnCnt = 0U;
- qpid::broker::TxnBuffer* tb = 0;
+ qpid::broker::SimpleTxnBuffer* tb = 0;
if (useTxns) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue);
}
for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) {
- boost::intrusive_ptr<SimpleMessage> msg(new SimpleMessage(m_msgData, m_perfTestParams.m_msgSize, m_store));
+ boost::intrusive_ptr<qpid::broker::SimpleMessage> msg(new qpid::broker::SimpleMessage(m_msgData, m_perfTestParams.m_msgSize, m_store));
if (useTxns) {
- boost::shared_ptr<TxnPublish> op(new TxnPublish(msg));
+ boost::shared_ptr<qpid::broker::SimpleTxnPublish> op(new qpid::broker::SimpleTxnPublish(msg));
op->deliverTo(m_queue);
tb->enlist(op);
if (++recsInTxnCnt >= m_perfTestParams.m_enqTxnBlockSize) {
@@ -72,7 +71,7 @@ MessageProducer::runProducers() {
// transaction until the current commit cycle completes. So use another instance. This
// instance should auto-delete when the async commit cycle completes.
if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue);
}
recsInTxnCnt = 0U;
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
index 127408e3db..6f98d03503 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
@@ -27,19 +27,17 @@
#include "boost/shared_ptr.hpp"
namespace qpid {
-namespace asyncStore {
-class AsyncStoreImpl;
-}
namespace broker {
class AsyncResultQueue;
-class TxnBuffer;
+class AsyncStore;
+class SimpleQueue;
+class SimpleTxnBuffer;
}}
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimpleQueue;
class TestOptions;
class MessageProducer
@@ -47,18 +45,18 @@ class MessageProducer
public:
MessageProducer(const TestOptions& perfTestParams,
const char* msgData,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq,
- boost::shared_ptr<SimpleQueue> queue);
+ boost::shared_ptr<qpid::broker::SimpleQueue> queue);
virtual ~MessageProducer();
void* runProducers();
static void* startProducers(void* ptr);
private:
const TestOptions& m_perfTestParams;
const char* m_msgData;
- qpid::asyncStore::AsyncStoreImpl* m_store;
+ qpid::broker::AsyncStore* m_store;
qpid::broker::AsyncResultQueue& m_resultQueue;
- boost::shared_ptr<SimpleQueue> m_queue;
+ boost::shared_ptr<qpid::broker::SimpleQueue> m_queue;
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/Messages.h b/cpp/src/tests/storePerftools/asyncPerf/Messages.h
deleted file mode 100644
index c1bfa328ea..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/Messages.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file Messages.h
- */
-
-/*
- * This is a copy of qpid::broker::Messages.h, but using the local
- * tests::storePerftools::asyncPerf::QueuedMessage class instead of
- * qpid::broker::QueuedMessage.
- */
-
-#ifndef tests_storePerftools_asyncPerf_Messages_h_
-#define tests_storePerftools_asyncPerf_Messages_h_
-
-#include <boost/shared_ptr.hpp>
-#include <stdint.h>
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-class QueuedMessage;
-
-class Messages
-{
-public:
- virtual ~Messages() {}
- virtual uint32_t size() = 0;
- virtual bool push(boost::shared_ptr<QueuedMessage>& added) = 0;
- virtual bool consume(boost::shared_ptr<QueuedMessage>& msg) = 0;
-};
-
-}}} // namespace tests::storePerftools::asyncPerf
-
-#endif // tests_storePerftools_asyncPerf_Messages_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
index 6377cc0d85..c05eb0487d 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
@@ -25,7 +25,6 @@
#include "MessageConsumer.h"
#include "MessageProducer.h"
-#include "SimpleQueue.h"
#include "tests/storePerftools/version.h"
#include "tests/storePerftools/common/ScopedTimer.h"
@@ -34,6 +33,7 @@
#include "qpid/Modules.h" // Use with loading store as module
#include "qpid/asyncStore/AsyncStoreImpl.h"
#include "qpid/asyncStore/AsyncStoreOptions.h"
+#include "qpid/broker/SimpleQueue.h"
#include <iomanip>
@@ -55,8 +55,7 @@ PerfTest::PerfTest(const TestOptions& to,
std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize);
}
-PerfTest::~PerfTest()
-{
+PerfTest::~PerfTest() {
m_poller->shutdown();
m_pollingThread.join();
@@ -68,8 +67,7 @@ PerfTest::~PerfTest()
}
void
-PerfTest::run()
-{
+PerfTest::run() {
if (m_testOpts.m_durable) {
prepareStore();
}
@@ -113,8 +111,7 @@ PerfTest::run()
}
void
-PerfTest::toStream(std::ostream& os) const
-{
+PerfTest::toStream(std::ostream& os) const {
m_testOpts.printVals(os);
os << std::endl;
m_storeOpts.printVals(os);
@@ -124,16 +121,15 @@ PerfTest::toStream(std::ostream& os) const
// private
void
-PerfTest::prepareStore()
-{
- m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts);
- m_store->initialize();
+PerfTest::prepareStore() {
+ qpid::asyncStore::AsyncStoreImpl* s = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts);
+ s->initialize();
+ m_store = s;
}
// private
void
-PerfTest::destroyStore()
-{
+PerfTest::destroyStore() {
if (m_store) {
delete m_store;
}
@@ -141,12 +137,11 @@ PerfTest::destroyStore()
// private
void
-PerfTest::prepareQueues()
-{
+PerfTest::prepareQueues() {
for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) {
std::ostringstream qname;
qname << "queue_" << std::setw(4) << std::setfill('0') << i;
- boost::shared_ptr<SimpleQueue> mpq(new SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue));
+ boost::shared_ptr<qpid::broker::SimpleQueue> mpq(new qpid::broker::SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue));
mpq->asyncCreate();
m_queueList.push_back(mpq);
}
@@ -154,8 +149,7 @@ PerfTest::prepareQueues()
// private
void
-PerfTest::destroyQueues()
-{
+PerfTest::destroyQueues() {
while (m_queueList.size() > 0) {
m_queueList.front()->asyncDestroy(m_testOpts.m_destroyQueuesOnCompletion);
m_queueList.pop_front();
@@ -163,8 +157,7 @@ PerfTest::destroyQueues()
}
int
-runPerfTest(int argc, char** argv)
-{
+runPerfTest(int argc, char** argv) {
// Load async store module
qpid::tryShlib ("asyncStore.so", false);
@@ -212,7 +205,6 @@ runPerfTest(int argc, char** argv)
// -----------------------------------------------------------------
int
-main(int argc, char** argv)
-{
+main(int argc, char** argv) {
return tests::storePerftools::asyncPerf::runPerfTest(argc, argv);
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
index e4d99021b5..27d8d08faf 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
@@ -37,9 +37,11 @@
namespace qpid {
namespace asyncStore {
-class AsyncStoreImpl;
class AsyncStoreOptions;
}
+namespace broker {
+class SimpleQueue;
+}
namespace sys {
class Poller;
}}
@@ -48,7 +50,6 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimpleQueue;
class MessageConsumer;
class MessageProducer;
class TestOptions;
@@ -73,8 +74,8 @@ private:
boost::shared_ptr<qpid::sys::Poller> m_poller;
qpid::sys::Thread m_pollingThread;
qpid::broker::AsyncResultQueueImpl m_resultQueue;
- qpid::asyncStore::AsyncStoreImpl* m_store;
- std::deque<boost::shared_ptr<SimpleQueue> > m_queueList;
+ qpid::broker::AsyncStore* m_store;
+ std::deque<boost::shared_ptr<qpid::broker::SimpleQueue> > m_queueList;
std::deque<boost::shared_ptr<MessageProducer> > m_producers;
std::deque<boost::shared_ptr<MessageConsumer> > m_consumers;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
deleted file mode 100644
index 0d16248c7f..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file QueuedMessage.cpp
- */
-
-#include "QueuedMessage.h"
-
-#include "SimpleMessage.h"
-#include "SimpleQueue.h"
-
-#include "qpid/asyncStore/AsyncStoreImpl.h"
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-QueuedMessage::QueuedMessage() :
- m_queue(0)
-{}
-
-QueuedMessage::QueuedMessage(SimpleQueue* q,
- boost::intrusive_ptr<SimpleMessage> msg) :
- boost::enable_shared_from_this<QueuedMessage>(),
- m_queue(q),
- m_msg(msg)
-{
- if (m_queue->getStore()) {
- m_enqHandle = q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle());
- }
-}
-
-QueuedMessage::QueuedMessage(const QueuedMessage& qm) :
- boost::enable_shared_from_this<QueuedMessage>(),
- m_queue(qm.m_queue),
- m_msg(qm.m_msg),
- m_enqHandle(qm.m_enqHandle)
-{}
-
-QueuedMessage::QueuedMessage(QueuedMessage* const qm) :
- boost::enable_shared_from_this<QueuedMessage>(),
- m_queue(qm->m_queue),
- m_msg(qm->m_msg),
- m_enqHandle(qm->m_enqHandle)
-{}
-
-QueuedMessage::~QueuedMessage()
-{}
-
-SimpleQueue*
-QueuedMessage::getQueue() const
-{
- return m_queue;
-}
-
-boost::intrusive_ptr<SimpleMessage>
-QueuedMessage::payload() const
-{
- return m_msg;
-}
-
-const qpid::broker::EnqueueHandle&
-QueuedMessage::enqHandle() const
-{
- return m_enqHandle;
-}
-
-qpid::broker::EnqueueHandle&
-QueuedMessage::enqHandle()
-{
- return m_enqHandle;
-}
-
-void
-QueuedMessage::prepareEnqueue(qpid::broker::TxnBuffer* tb)
-{
- m_queue->enqueue(tb, shared_from_this());
-}
-
-void
-QueuedMessage::commitEnqueue()
-{
- m_queue->process(m_msg);
-}
-
-void
-QueuedMessage::abortEnqueue()
-{
- m_queue->enqueueAborted(m_msg);
-}
-
-}}} // namespace tests::storePerfTools
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
deleted file mode 100644
index 630fe1aedc..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file QueuedMessage.h
- */
-
-#ifndef tests_storePerftools_asyncPerf_QueuedMessage_h_
-#define tests_storePerftools_asyncPerf_QueuedMessage_h_
-
-#include "qpid/broker/AsyncStore.h"
-#include "qpid/broker/EnqueueHandle.h"
-
-#include <boost/enable_shared_from_this.hpp>
-#include <boost/intrusive_ptr.hpp>
-
-namespace qpid {
-namespace broker {
-
-class TxnHandle;
-
-}}
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-class SimpleMessage;
-class SimpleQueue;
-
-class QueuedMessage : public boost::enable_shared_from_this<QueuedMessage>
-{
-public:
- QueuedMessage();
- QueuedMessage(SimpleQueue* q,
- boost::intrusive_ptr<SimpleMessage> msg);
- QueuedMessage(const QueuedMessage& qm);
- QueuedMessage(QueuedMessage* const qm);
- virtual ~QueuedMessage();
- SimpleQueue* getQueue() const;
- boost::intrusive_ptr<SimpleMessage> payload() const;
- const qpid::broker::EnqueueHandle& enqHandle() const;
- qpid::broker::EnqueueHandle& enqHandle();
-
- // --- Transaction handling ---
- void prepareEnqueue(qpid::broker::TxnBuffer* tb);
- void commitEnqueue();
- void abortEnqueue();
-
-private:
- SimpleQueue* m_queue;
- boost::intrusive_ptr<SimpleMessage> m_msg;
- qpid::broker::EnqueueHandle m_enqHandle;
-};
-
-}}} // namespace tests::storePerfTools
-
-#endif // tests_storePerftools_asyncPerf_QueuedMessage_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp
deleted file mode 100644
index bacf438b9f..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file SimpleMessage.cpp
- */
-
-#include "SimpleMessage.h"
-
-#include <string.h> // memcpy()
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-SimpleMessage::SimpleMessage(const char* msgData,
- const uint32_t msgSize) :
- m_persistenceId(0ULL),
- m_msg(msgData, static_cast<size_t>(msgSize)),
- m_store(0),
- m_msgHandle(qpid::broker::MessageHandle())
-{}
-
-SimpleMessage::SimpleMessage(const char* msgData,
- const uint32_t msgSize,
- qpid::broker::AsyncStore* store) :
- m_persistenceId(0ULL),
- m_msg(msgData, static_cast<size_t>(msgSize)),
- m_store(store),
- m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle())
-{}
-
-SimpleMessage::~SimpleMessage()
-{}
-
-const qpid::broker::MessageHandle&
-SimpleMessage::getHandle() const
-{
- return m_msgHandle;
-}
-
-qpid::broker::MessageHandle&
-SimpleMessage::getHandle()
-{
- return m_msgHandle;
-}
-
-uint64_t
-SimpleMessage::contentSize() const
-{
- return static_cast<uint64_t>(m_msg.size());
-}
-
-void
-SimpleMessage::setPersistenceId(uint64_t id) const
-{
- m_persistenceId = id;
-}
-
-uint64_t
-SimpleMessage::getPersistenceId() const
-{
- return m_persistenceId;
-}
-
-void
-SimpleMessage::encode(qpid::framing::Buffer& buffer) const
-{
- buffer.putRawData(m_msg);
-}
-
-uint32_t
-SimpleMessage::encodedSize() const
-{
- return static_cast<uint32_t>(m_msg.size());
-}
-
-void
-SimpleMessage::allDequeuesComplete()
-{}
-
-uint32_t
-SimpleMessage::encodedHeaderSize() const
-{
- return 0;
-}
-
-bool
-SimpleMessage::isPersistent() const
-{
- return m_store != 0;
-}
-
-uint64_t
-SimpleMessage::getSize()
-{
- return m_msg.size();
-}
-
-void
-SimpleMessage::write(char* target)
-{
- ::memcpy(target, m_msg.data(), m_msg.size());
-}
-
-}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h
deleted file mode 100644
index 169f5a8959..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file SimpleMessage.h
- */
-
-#ifndef tests_storePerftools_asyncPerf_SimpleMessage_h_
-#define tests_storePerftools_asyncPerf_SimpleMessage_h_
-
-#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource
-#include "qpid/broker/MessageHandle.h"
-#include "qpid/broker/PersistableMessage.h"
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-class SimpleMessage: public qpid::broker::PersistableMessage,
- public qpid::broker::DataSource
-{
-public:
- SimpleMessage(const char* msgData,
- const uint32_t msgSize);
- SimpleMessage(const char* msgData,
- const uint32_t msgSize,
- qpid::broker::AsyncStore* store);
- virtual ~SimpleMessage();
- const qpid::broker::MessageHandle& getHandle() const;
- qpid::broker::MessageHandle& getHandle();
- uint64_t contentSize() const;
-
- // --- Interface Persistable ---
- virtual void setPersistenceId(uint64_t id) const;
- virtual uint64_t getPersistenceId() const;
- virtual void encode(qpid::framing::Buffer& buffer) const;
- virtual uint32_t encodedSize() const;
-
- // --- Interface PersistableMessage ---
- virtual void allDequeuesComplete();
- virtual uint32_t encodedHeaderSize() const;
- virtual bool isPersistent() const;
-
- // --- Interface DataSource ---
- virtual uint64_t getSize(); // <- same as encodedSize()?
- virtual void write(char* target);
-
-private:
- mutable uint64_t m_persistenceId;
- const std::string m_msg;
- qpid::broker::AsyncStore* m_store;
-
- qpid::broker::MessageHandle m_msgHandle;
-};
-
-}}} // namespace tests::storePerftools::asyncPerf
-
-#endif // tests_storePerftools_asyncPerf_SimpleMessage_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
deleted file mode 100644
index 06b4e9333f..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file SimpleQueue.cpp
- */
-
-#include "SimpleQueue.h"
-
-#include "DeliveryRecord.h"
-#include "MessageConsumer.h"
-#include "MessageDeque.h"
-#include "QueuedMessage.h"
-#include "SimpleMessage.h"
-
-#include "qpid/broker/AsyncResultHandle.h"
-#include "qpid/broker/QueueAsyncContext.h"
-#include "qpid/broker/TxnBuffer.h"
-
-#include <string.h> // memcpy()
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-//static
-qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations
-
-
-SimpleQueue::SimpleQueue(const std::string& name,
- const qpid::framing::FieldTable& /*args*/,
- qpid::broker::AsyncStore* store,
- qpid::broker::AsyncResultQueue& arq) :
- qpid::broker::PersistableQueue(),
- m_name(name),
- m_store(store),
- m_resultQueue(arq),
- m_asyncOpCounter(0UL),
- m_persistenceId(0ULL),
- m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this.
- m_destroyPending(false),
- m_destroyed(false),
- m_barrier(*this),
- m_messages(new MessageDeque())
-{
- if (m_store != 0) {
- const qpid::types::Variant::Map qo;
- m_queueHandle = m_store->createQueueHandle(m_name, qo);
- }
-}
-
-SimpleQueue::~SimpleQueue() {}
-
-const qpid::broker::QueueHandle&
-SimpleQueue::getHandle() const {
- return m_queueHandle;
-}
-
-qpid::broker::QueueHandle&
-SimpleQueue::getHandle() {
- return m_queueHandle;
-}
-
-qpid::broker::AsyncStore*
-SimpleQueue::getStore() {
- return m_store;
-}
-
-void
-SimpleQueue::asyncCreate() {
- if (m_store) {
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- &handleAsyncCreateResult,
- &m_resultQueue));
- m_store->submitCreate(m_queueHandle, this, qac);
- ++m_asyncOpCounter;
- }
-}
-
-//static
-void
-SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh) {
- if (arh) {
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
- boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
- boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
- if (arh->getErrNo()) {
- // TODO: Handle async failure here (other than by simply printing a message)
- std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure "
- << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
- } else {
- sq->createComplete(qc);
- }
- }
-}
-
-void
-SimpleQueue::asyncDestroy(const bool deleteQueue)
-{
- m_destroyPending = true;
- if (m_store) {
- if (deleteQueue) {
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- &handleAsyncDestroyResult,
- &m_resultQueue));
- m_store->submitDestroy(m_queueHandle, qac);
- ++m_asyncOpCounter;
- }
- m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000));
- }
-}
-
-//static
-void
-SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh) {
- if (arh) {
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
- boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
- boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
- if (arh->getErrNo()) {
- // TODO: Handle async failure here (other than by simply printing a message)
- std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure "
- << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
- } else {
- sq->destroyComplete(qc);
- }
- }
-}
-
-void
-SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) {
- boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
- enqueue(qm);
- push(qm);
-}
-
-bool
-SimpleQueue::dispatch(MessageConsumer& mc) {
- boost::shared_ptr<QueuedMessage> qm;
- if (m_messages->consume(qm)) {
- boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false));
- mc.record(dr);
- return true;
- }
- return false;
-}
-
-bool
-SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) {
- return enqueue(0, qm);
-}
-
-bool
-SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm) {
- ScopedUse u(m_barrier);
- if (!u.m_acquired) {
- return false;
- }
- if (qm->payload()->isPersistent() && m_store) {
- qm->payload()->enqueueAsync(shared_from_this(), m_store);
- return asyncEnqueue(tb, qm);
- }
- return false;
-}
-
-bool
-SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) {
- return dequeue(0, qm);
-}
-
-bool
-SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm) {
- ScopedUse u(m_barrier);
- if (!u.m_acquired) {
- return false;
- }
- if (qm->payload()->isPersistent() && m_store) {
- qm->payload()->dequeueAsync(shared_from_this(), m_store);
- return asyncDequeue(tb, qm);
- }
- return true;
-}
-
-void
-SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) {
- push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
-}
-
-void
-SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage>) {}
-
-void
-SimpleQueue::encode(qpid::framing::Buffer& buffer) const {
- buffer.putShortString(m_name);
-}
-
-uint32_t
-SimpleQueue::encodedSize() const {
- return m_name.size() + 1;
-}
-
-uint64_t
-SimpleQueue::getPersistenceId() const {
- return m_persistenceId;
-}
-
-void
-SimpleQueue::setPersistenceId(uint64_t persistenceId) const {
- m_persistenceId = persistenceId;
-}
-
-void
-SimpleQueue::flush() {
- //if(m_store) m_store->flush(*this);
-}
-
-const std::string&
-SimpleQueue::getName() const {
- return m_name;
-}
-
-void
-SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) {
- if (externalQueueStore != inst && externalQueueStore)
- delete externalQueueStore;
- externalQueueStore = inst;
-}
-
-uint64_t
-SimpleQueue::getSize() {
- return m_persistableData.size();
-}
-
-void
-SimpleQueue::write(char* target) {
- ::memcpy(target, m_persistableData.data(), m_persistableData.size());
-}
-
-// --- Members & methods in msg handling path from qpid::Queue ---
-
-// protected
-SimpleQueue::UsageBarrier::UsageBarrier(SimpleQueue& q) :
- m_parent(q),
- m_count(0)
-{}
-
-// protected
-bool
-SimpleQueue::UsageBarrier::acquire()
-{
- qpid::sys::Monitor::ScopedLock l(m_monitor);
- if (m_parent.m_destroyed) {
- return false;
- } else {
- ++m_count;
- return true;
- }
-}
-
-// protected
-void SimpleQueue::UsageBarrier::release()
-{
- qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor);
- if (--m_count == 0) {
- m_monitor.notifyAll();
- }
-}
-
-// protected
-void SimpleQueue::UsageBarrier::destroy()
-{
- qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor);
- m_parent.m_destroyed = true;
- while (m_count) {
- m_monitor.wait();
- }
-}
-
-// protected
-SimpleQueue::ScopedUse::ScopedUse(UsageBarrier& b) :
- m_barrier(b),
- m_acquired(m_barrier.acquire())
-{}
-
-// protected
-SimpleQueue::ScopedUse::~ScopedUse()
-{
- if (m_acquired) {
- m_barrier.release();
- }
-}
-
-// private
-void
-SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
- bool /*isRecovery*/)
-{
- m_messages->push(qm);
-}
-
-// --- End Members & methods in msg handling path from qpid::Queue ---
-
-// private
-bool
-SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm) {
- assert(qm.get());
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- qm->payload(),
- tb,
- &handleAsyncEnqueueResult,
- &m_resultQueue));
- if (tb) {
- tb->incrOpCnt();
- m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac);
- } else {
- m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac);
- }
- ++m_asyncOpCounter;
- return true;
-}
-
-// private static
-void
-SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh) {
- if (arh) {
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
- boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
- boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
- if (arh->getErrNo()) {
- // TODO: Handle async failure here (other than by simply printing a message)
- std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure "
- << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
- } else {
- sq->enqueueComplete(qc);
- }
- }
-}
-
-// private
-bool
-SimpleQueue::asyncDequeue(/*boost::shared_ptr<qpid::broker::TxnBuffer>*/qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm)
-{
- assert(qm.get());
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- qm->payload(),
- tb,
- &handleAsyncDequeueResult,
- &m_resultQueue));
- if (tb) {
- tb->incrOpCnt();
- m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac);
- } else {
- m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac);
- }
- ++m_asyncOpCounter;
- return true;
-}
-// private static
-void
-SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh) {
- if (arh) {
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
- boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
- boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
- if (arh->getErrNo()) {
- // TODO: Handle async failure here (other than by simply printing a message)
- std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure "
- << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
- } else {
- sq->dequeueComplete(qc);
- }
- }
-}
-
-// private
-void
-SimpleQueue::destroyCheck(const std::string& opDescr) const {
- if (m_destroyPending || m_destroyed) {
- std::ostringstream oss;
- oss << opDescr << " on queue \"" << m_name << "\" after call to destroy";
- throw qpid::Exception(oss.str());
- }
-}
-
-// private
-void
-SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
- if (qc.get()) {
- assert(qc->getQueue().get() == this);
- }
- --m_asyncOpCounter;
-}
-
-// private
-void
-SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
- if (qc.get()) {
- assert(qc->getQueue().get() == this);
- }
- --m_asyncOpCounter;
-}
-
-// private
-void
-SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
- if (qc.get()) {
- assert(qc->getQueue().get() == this);
- }
- --m_asyncOpCounter;
- m_destroyed = true;
-}
-
-// private
-void
-SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
- if (qc.get()) {
- assert(qc->getQueue().get() == this);
- if (qc->getTxnBuffer()) { // transactional enqueue
- qc->getTxnBuffer()->decrOpCnt();
- }
- }
- --m_asyncOpCounter;
-}
-
-// private
-void
-SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
- if (qc.get()) {
- assert(qc->getQueue().get() == this);
- if (qc->getTxnBuffer()) { // transactional enqueue
- qc->getTxnBuffer()->decrOpCnt();
- }
- }
- --m_asyncOpCounter;
-}
-
-}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
deleted file mode 100644
index 5f64c9b960..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file SimpleQueue.h
- */
-
-#ifndef tests_storePerftools_asyncPerf_SimpleQueue_h_
-#define tests_storePerftools_asyncPerf_SimpleQueue_h_
-
-#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter
-#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource
-#include "qpid/broker/PersistableQueue.h"
-#include "qpid/broker/QueueHandle.h"
-#include "qpid/sys/Monitor.h"
-
-#include <boost/intrusive_ptr.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
-
-namespace qpid {
-namespace broker {
-class AsyncResultQueue;
-class QueueAsyncContext;
-class TxnBuffer;
-}
-namespace framing {
-class FieldTable;
-}}
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-class MessageConsumer;
-class Messages;
-class QueuedMessage;
-class SimpleMessage;
-
-class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>,
- public qpid::broker::PersistableQueue,
- public qpid::broker::DataSource
-{
-public:
- SimpleQueue(const std::string& name,
- const qpid::framing::FieldTable& args,
- qpid::broker::AsyncStore* store,
- qpid::broker::AsyncResultQueue& arq);
- virtual ~SimpleQueue();
-
- const qpid::broker::QueueHandle& getHandle() const;
- qpid::broker::QueueHandle& getHandle();
- qpid::broker::AsyncStore* getStore();
-
- void asyncCreate();
- static void handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh);
- void asyncDestroy(const bool deleteQueue);
- static void handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh);
-
- // --- Methods in msg handling path from qpid::Queue ---
- void deliver(boost::intrusive_ptr<SimpleMessage> msg);
- bool dispatch(MessageConsumer& mc);
- bool enqueue(boost::shared_ptr<QueuedMessage> qm);
- bool enqueue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm);
- bool dequeue(boost::shared_ptr<QueuedMessage> qm);
- bool dequeue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm);
- void process(boost::intrusive_ptr<SimpleMessage> msg);
- void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg);
-
- // --- Interface qpid::broker::Persistable ---
- virtual void encode(qpid::framing::Buffer& buffer) const;
- virtual uint32_t encodedSize() const;
- virtual uint64_t getPersistenceId() const;
- virtual void setPersistenceId(uint64_t persistenceId) const;
-
- // --- Interface qpid::broker::PersistableQueue ---
- virtual void flush();
- virtual const std::string& getName() const;
- virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst);
-
- // --- Interface qpid::broker::DataStore ---
- virtual uint64_t getSize();
- virtual void write(char* target);
-
-private:
- static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations
-
- const std::string m_name;
- qpid::broker::AsyncStore* m_store;
- qpid::broker::AsyncResultQueue& m_resultQueue;
- qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; // TODO: change this to non-async store counter!
- mutable uint64_t m_persistenceId;
- std::string m_persistableData;
- qpid::broker::QueueHandle m_queueHandle;
- bool m_destroyPending;
- bool m_destroyed;
-
- // --- Members & methods in msg handling path copied from qpid::Queue ---
- struct UsageBarrier {
- SimpleQueue& m_parent;
- uint32_t m_count;
- qpid::sys::Monitor m_monitor;
- UsageBarrier(SimpleQueue& q);
- bool acquire();
- void release();
- void destroy();
- };
- struct ScopedUse {
- UsageBarrier& m_barrier;
- const bool m_acquired;
- ScopedUse(UsageBarrier& b);
- ~ScopedUse();
- };
- UsageBarrier m_barrier;
- std::auto_ptr<Messages> m_messages;
- void push(boost::shared_ptr<QueuedMessage> qm,
- bool isRecovery = false);
-
- // -- Async ops ---
- bool asyncEnqueue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm);
- static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh);
- bool asyncDequeue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm);
- static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh);
-
- // --- Async op counter ---
- void destroyCheck(const std::string& opDescr) const;
-
- // --- Async op completions (called through handleAsyncResult) ---
- void createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
- void flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
- void destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
- void enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
- void dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
-};
-
-}}} // namespace tests::storePerftools::asyncPerf
-
-#endif // tests_storePerftools_asyncPerf_SimpleQueue_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
index 8c1f2976bf..20e9c39f1b 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
@@ -62,12 +62,10 @@ TestOptions::TestOptions(const uint32_t numMsgs,
doAddOptions();
}
-TestOptions::~TestOptions()
-{}
+TestOptions::~TestOptions() {}
void
-TestOptions::printVals(std::ostream& os) const
-{
+TestOptions::printVals(std::ostream& os) const {
tests::storePerftools::common::TestOptions::printVals(os);
os << " Num enqueus per transaction [-t, --enq-txn-size]: " << m_enqTxnBlockSize << std::endl;
os << " Num dequeues per transaction [-d, --deq-txn-size]: " << m_deqTxnBlockSize << std::endl;
@@ -77,8 +75,7 @@ TestOptions::printVals(std::ostream& os) const
// private
void
-TestOptions::doAddOptions()
-{
+TestOptions::doAddOptions() {
addOptions()
("enq-txn-size,t", qpid::optValue(m_enqTxnBlockSize, "N"),
"Num enqueus per transaction (0 = no transactions)")
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp
index cf6f293494..312fa187b8 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp
@@ -32,12 +32,10 @@ TestResult::TestResult(const TestOptions& to) :
m_testOpts(to)
{}
-TestResult::~TestResult()
-{}
+TestResult::~TestResult() {}
void
-TestResult::toStream(std::ostream& os) const
-{
+TestResult::toStream(std::ostream& os) const {
double msgsRate;
os << "TEST RESULTS:" << std::endl;
os << " Msgs per thread: " << m_testOpts.m_numMsgs << std::endl;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
deleted file mode 100644
index 375cd568d2..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file TxnAccept.cpp
- */
-
-#include "TxnAccept.h"
-
-#include "DeliveryRecord.h"
-
-#include "qpid/log/Statement.h"
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-TxnAccept::TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops) :
- m_ops(ops)
-{}
-
-TxnAccept::~TxnAccept() {}
-
-// --- Interface TxnOp ---
-
-bool
-TxnAccept::prepare(qpid::broker::TxnBuffer* tb) throw() {
- try {
- for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
- (*i)->dequeue(tb);
- }
- return true;
- } catch (const std::exception& e) {
- QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what());
- } catch (...) {
- QPID_LOG(error, "TxnAccept: Failed to prepare transaction: (unknown error)");
- }
- return false;
-}
-
-void
-TxnAccept::commit() throw() {
- try {
- for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) {
- (*i)->committed();
- (*i)->setEnded();
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, "TxnAccept: Failed to commit transaction: " << e.what());
- } catch(...) {
- QPID_LOG(error, "TxnAccept: Failed to commit transaction: (unknown error)");
- }
-}
-
-void
-TxnAccept::rollback() throw() {}
-
-}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
deleted file mode 100644
index 5d84289965..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file TxnAccept.h
- */
-
-#ifndef tests_storePerftools_asyncPerf_TxnAccept_h_
-#define tests_storePerftools_asyncPerf_TxnAccept_h_
-
-#include "qpid/broker/TxnOp.h"
-
-#include "boost/shared_ptr.hpp"
-#include <deque>
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-class DeliveryRecord;
-
-class TxnAccept: public qpid::broker::TxnOp {
-public:
- TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops);
- virtual ~TxnAccept();
-
- // --- Interface TxnOp ---
- bool prepare(qpid::broker::TxnBuffer* tb) throw();
- void commit() throw();
- void rollback() throw();
-private:
- std::deque<boost::shared_ptr<DeliveryRecord> > m_ops;
-};
-
-}}} // namespace tests::storePerftools::asyncPerf
-
-#endif // tests_storePerftools_asyncPerf_TxnAccept_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
deleted file mode 100644
index cc36a38be7..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file TxnPublish.cpp
- */
-
-#include "TxnPublish.h"
-
-#include "QueuedMessage.h"
-#include "SimpleMessage.h"
-#include "SimpleQueue.h"
-
-#include "qpid/log/Statement.h"
-#include <boost/make_shared.hpp>
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) :
- m_msg(msg)
-{}
-
-TxnPublish::~TxnPublish() {}
-
-bool
-TxnPublish::prepare(qpid::broker::TxnBuffer* tb) throw() {
- try {
- while (!m_queues.empty()) {
- m_queues.front()->prepareEnqueue(tb);
- m_prepared.push_back(m_queues.front());
- m_queues.pop_front();
- }
- return true;
- } catch (const std::exception& e) {
- QPID_LOG(error, "TxnPublish: Failed to prepare transaction: " << e.what());
- } catch (...) {
- QPID_LOG(error, "TxnPublish: Failed to prepare transaction: (unknown error)");
- }
- return false;
-}
-
-void
-TxnPublish::commit() throw() {
- try {
- for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
- (*i)->commitEnqueue();
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, "TxnPublish: Failed to commit transaction: " << e.what());
- } catch (...) {
- QPID_LOG(error, "TxnPublish: Failed to commit transaction: (unknown error)");
- }
-}
-
-void
-TxnPublish::rollback() throw() {
- try {
- for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
- (*i)->abortEnqueue();
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, "TxnPublish: Failed to rollback transaction: " << e.what());
- } catch (...) {
- QPID_LOG(error, "TxnPublish: Failed to rollback transaction: (unknown error)");
- }
-}
-
-uint64_t
-TxnPublish::contentSize() {
- return m_msg->contentSize();
-}
-
-void
-TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) {
- m_queues.push_back(boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg)));
- m_delivered = true;
-}
-
-SimpleMessage&
-TxnPublish::getMessage() {
- return *m_msg;
-}
-
-}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
deleted file mode 100644
index eae9ef9c4c..0000000000
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * \file TxnPublish.h
- */
-
-#ifndef tests_storePerftools_asyncPerf_TxnPublish_h_
-#define tests_storePerftools_asyncPerf_TxnPublish_h_
-
-#include "Deliverable.h"
-
-#include "qpid/broker/TxnOp.h"
-
-#include <boost/intrusive_ptr.hpp>
-#include <boost/shared_ptr.hpp>
-#include <list>
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-class QueuedMessage;
-class SimpleMessage;
-class SimpleQueue;
-
-class TxnPublish : public qpid::broker::TxnOp,
- public Deliverable
-{
-public:
- TxnPublish(boost::intrusive_ptr<SimpleMessage> msg);
- virtual ~TxnPublish();
-
- // --- Interface TxOp ---
- bool prepare(qpid::broker::TxnBuffer* tb) throw();
- void commit() throw();
- void rollback() throw();
-
- // --- Interface Deliverable ---
- uint64_t contentSize();
- void deliverTo(const boost::shared_ptr<SimpleQueue>& queue);
- SimpleMessage& getMessage();
-
-private:
- boost::intrusive_ptr<SimpleMessage> m_msg;
- std::list<boost::shared_ptr<QueuedMessage> > m_queues;
- std::list<boost::shared_ptr<QueuedMessage> > m_prepared;
-};
-
-}}} // namespace tests::storePerftools::asyncPerf
-
-#endif // tests_storePerftools_asyncPerf_TxnPublish_h_