summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/TxnOp.h2
-rw-r--r--cpp/src/tests/CMakeLists.txt6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/Deliverable.h8
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp82
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h56
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp10
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h16
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp27
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h18
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp8
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.h4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp12
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h18
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp16
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h15
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp)36
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h (renamed from cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h)22
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp)104
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h (renamed from cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h)36
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp52
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h46
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp10
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h14
25 files changed, 455 insertions, 175 deletions
diff --git a/cpp/src/qpid/broker/TxnOp.h b/cpp/src/qpid/broker/TxnOp.h
index e98429535e..1626e30ccd 100644
--- a/cpp/src/qpid/broker/TxnOp.h
+++ b/cpp/src/qpid/broker/TxnOp.h
@@ -27,6 +27,8 @@
namespace qpid {
namespace broker {
+class TxnHandle;
+
class TxnOp{
public:
virtual ~TxnOp() {}
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index fbe4cfbd7d..ad9a518867 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -380,6 +380,7 @@ endif (UNIX)
# Async store perf test (asyncPerf)
set (asyncStorePerf_SOURCES
storePerftools/asyncPerf/Deliverable.cpp
+ storePerftools/asyncPerf/DeliveryRecord.cpp
storePerftools/asyncPerf/MessageAsyncContext.cpp
storePerftools/asyncPerf/MessageConsumer.cpp
storePerftools/asyncPerf/MessageDeque.cpp
@@ -387,10 +388,11 @@ set (asyncStorePerf_SOURCES
storePerftools/asyncPerf/PerfTest.cpp
storePerftools/asyncPerf/QueueAsyncContext.cpp
storePerftools/asyncPerf/QueuedMessage.cpp
- storePerftools/asyncPerf/SimplePersistableMessage.cpp
- storePerftools/asyncPerf/SimplePersistableQueue.cpp
+ storePerftools/asyncPerf/SimpleMessage.cpp
+ storePerftools/asyncPerf/SimpleQueue.cpp
storePerftools/asyncPerf/TestOptions.cpp
storePerftools/asyncPerf/TestResult.cpp
+ storePerftools/asyncPerf/TxnAccept.cpp
storePerftools/asyncPerf/TxnPublish.cpp
storePerftools/common/Parameters.cpp
diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h
index 57e130eeba..990d53a199 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h
@@ -31,8 +31,8 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimplePersistableMessage;
-class SimplePersistableQueue;
+class SimpleMessage;
+class SimpleQueue;
class Deliverable
{
@@ -41,8 +41,8 @@ public:
virtual ~Deliverable();
virtual uint64_t contentSize() = 0;
- virtual void deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue) = 0;
- virtual SimplePersistableMessage& getMessage() = 0;
+ virtual void deliverTo(const boost::shared_ptr<SimpleQueue>& queue) = 0;
+ virtual SimpleMessage& getMessage() = 0;
virtual bool isDelivered() const;
protected:
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
new file mode 100644
index 0000000000..7a0224a9b5
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file DeliveryRecord.cpp
+ */
+
+#include "DeliveryRecord.h"
+
+#include "SimpleMessage.h"
+#include "SimpleQueue.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+DeliveryRecord::DeliveryRecord(const QueuedMessage& qm,
+ bool accepted) :
+ m_queuedMessage(qm),
+ m_accepted(accepted),
+ m_ended(accepted)
+{}
+
+DeliveryRecord::~DeliveryRecord()
+{}
+
+bool
+DeliveryRecord::accept(qpid::broker::TxnHandle* txn)
+{
+ if (!m_ended) {
+ assert(m_queuedMessage.getQueue());
+ m_queuedMessage.getQueue()->dequeue(*txn, m_queuedMessage);
+ m_accepted = true;
+ setEnded();
+ }
+ return isRedundant();
+}
+
+bool
+DeliveryRecord::isAccepted() const
+{
+ return m_accepted;
+}
+
+bool
+DeliveryRecord::setEnded()
+{
+ m_ended = true;
+ m_queuedMessage.payload() = boost::intrusive_ptr<SimpleMessage>(0);
+ return isRedundant();
+}
+
+bool
+DeliveryRecord::isEnded() const
+{
+ return m_ended;
+}
+
+bool
+DeliveryRecord::isRedundant() const
+{
+ return m_ended;
+}
+
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
new file mode 100644
index 0000000000..25b5446a5f
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file DeliveryRecord.h
+ */
+
+#ifndef tests_storePerftools_asyncPerf_DeliveryRecord_h_
+#define tests_storePerftools_asyncPerf_DeliveryRecord_h_
+
+#include "QueuedMessage.h"
+
+namespace qpid {
+namespace broker {
+class TxnHandle;
+}}
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class DeliveryRecord {
+public:
+ DeliveryRecord(const QueuedMessage& qm,
+ bool accepted);
+ virtual ~DeliveryRecord();
+ bool accept(qpid::broker::TxnHandle* txn);
+ bool isAccepted() const;
+ bool setEnded();
+ bool isEnded() const;
+ bool isRedundant() const;
+private:
+ QueuedMessage m_queuedMessage;
+ bool m_accepted : 1;
+ bool m_ended : 1;
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerftools_asyncPerf_DeliveryRecord_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
index 5e161d49c8..abb6b5c657 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
@@ -22,7 +22,7 @@
*/
#include "MessageAsyncContext.h"
-#include "SimplePersistableMessage.h"
+#include "SimpleMessage.h"
#include <cassert>
@@ -30,9 +30,9 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimplePersistableMessage> msg,
+MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
const qpid::asyncStore::AsyncOperation::opCode op,
- boost::shared_ptr<SimplePersistableQueue> q) :
+ boost::shared_ptr<SimpleQueue> q) :
m_msg(msg),
m_op(op),
m_q(q)
@@ -56,13 +56,13 @@ MessageAsyncContext::getOpStr() const
return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
}
-boost::intrusive_ptr<SimplePersistableMessage>
+boost::intrusive_ptr<SimpleMessage>
MessageAsyncContext::getMessage() const
{
return m_msg;
}
-boost::shared_ptr<SimplePersistableQueue>
+boost::shared_ptr<SimpleQueue>
MessageAsyncContext::getQueue() const
{
return m_q;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
index f13cd4ab64..8418c4c760 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
@@ -34,26 +34,26 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimplePersistableMessage;
-class SimplePersistableQueue;
+class SimpleMessage;
+class SimpleQueue;
class MessageAsyncContext : public qpid::broker::BrokerAsyncContext
{
public:
- MessageAsyncContext(boost::intrusive_ptr<SimplePersistableMessage> msg,
+ MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
const qpid::asyncStore::AsyncOperation::opCode op,
- boost::shared_ptr<SimplePersistableQueue> q);
+ boost::shared_ptr<SimpleQueue> q);
virtual ~MessageAsyncContext();
qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
const char* getOpStr() const;
- boost::intrusive_ptr<SimplePersistableMessage> getMessage() const;
- boost::shared_ptr<SimplePersistableQueue> getQueue() const;
+ boost::intrusive_ptr<SimpleMessage> getMessage() const;
+ boost::shared_ptr<SimpleQueue> getQueue() const;
void destroy();
private:
- boost::intrusive_ptr<SimplePersistableMessage> m_msg;
+ boost::intrusive_ptr<SimpleMessage> m_msg;
const qpid::asyncStore::AsyncOperation::opCode m_op;
- boost::shared_ptr<SimplePersistableQueue> m_q;
+ boost::shared_ptr<SimpleQueue> m_q;
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 9b015fc428..1859bde947 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -23,9 +23,12 @@
#include "MessageConsumer.h"
-#include "SimplePersistableQueue.h"
+#include "SimpleQueue.h"
#include "TestOptions.h"
+#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/broker/TxnBuffer.h"
+
#include <stdint.h> // uint32_t
namespace tests {
@@ -33,8 +36,12 @@ namespace storePerftools {
namespace asyncPerf {
MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
- boost::shared_ptr<SimplePersistableQueue> queue) :
+ qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncResultQueue& arq,
+ boost::shared_ptr<SimpleQueue> queue) :
m_perfTestParams(perfTestParams),
+ m_store(store),
+ m_resultQueue(arq),
m_queue(queue)
{}
@@ -44,6 +51,13 @@ MessageConsumer::~MessageConsumer()
void*
MessageConsumer::runConsumers()
{
+ const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
+ uint16_t txnCnt = 0U;
+ qpid::broker::TxnBuffer* tb = 0;
+ if (useTxns) {
+ tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ }
+
uint32_t numMsgs = 0;
while (numMsgs < m_perfTestParams.m_numMsgs) {
if (m_queue->dispatch()) {
@@ -52,6 +66,15 @@ MessageConsumer::runConsumers()
::usleep(1000); // TODO - replace this poller with condition variable
}
}
+
+ if (txnCnt) {
+ if (m_perfTestParams.m_durable) {
+ tb->commitLocal(m_store);
+ } else {
+ tb->commit();
+ }
+ }
+
return reinterpret_cast<void*>(0);
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
index 7f5816b6a0..5404fe9f58 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
@@ -26,25 +26,37 @@
#include "boost/shared_ptr.hpp"
+namespace qpid {
+namespace asyncStore {
+class AsyncStoreImpl;
+}
+namespace broker {
+class AsyncResultQueue;
+}}
+
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimplePersistableQueue;
+class SimpleQueue;
class TestOptions;
class MessageConsumer
{
public:
MessageConsumer(const TestOptions& perfTestParams,
- boost::shared_ptr<SimplePersistableQueue> queue);
+ qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncResultQueue& arq,
+ boost::shared_ptr<SimpleQueue> queue);
virtual ~MessageConsumer();
void* runConsumers();
static void* startConsumers(void* ptr);
private:
const TestOptions& m_perfTestParams;
- boost::shared_ptr<SimplePersistableQueue> m_queue;
+ qpid::asyncStore::AsyncStoreImpl* m_store;
+ qpid::broker::AsyncResultQueue& m_resultQueue;
+ boost::shared_ptr<SimpleQueue> m_queue;
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
index 008ddf33e7..0cab537fb0 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
@@ -23,8 +23,8 @@
#include "MessageProducer.h"
-#include "SimplePersistableMessage.h"
-#include "SimplePersistableQueue.h"
+#include "SimpleMessage.h"
+#include "SimpleQueue.h"
#include "TestOptions.h"
#include "TxnPublish.h"
@@ -41,7 +41,7 @@ MessageProducer::MessageProducer(const TestOptions& perfTestParams,
const char* msgData,
qpid::asyncStore::AsyncStoreImpl* store,
qpid::broker::AsyncResultQueue& arq,
- boost::shared_ptr<SimplePersistableQueue> queue) :
+ boost::shared_ptr<SimpleQueue> queue) :
m_perfTestParams(perfTestParams),
m_msgData(msgData),
m_store(store),
@@ -62,7 +62,7 @@ MessageProducer::runProducers()
tb = new qpid::broker::TxnBuffer(m_resultQueue);
}
for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) {
- boost::intrusive_ptr<SimplePersistableMessage> msg(new SimplePersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store));
+ boost::intrusive_ptr<SimpleMessage> msg(new SimpleMessage(m_msgData, m_perfTestParams.m_msgSize, m_store));
if (useTxns) {
boost::shared_ptr<TxnPublish> op(new TxnPublish(msg));
op->deliverTo(m_queue);
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
index 55504164ef..7fa74a2c51 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
@@ -38,7 +38,7 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimplePersistableQueue;
+class SimpleQueue;
class TestOptions;
class TxnBuffer;
@@ -49,7 +49,7 @@ public:
const char* msgData,
qpid::asyncStore::AsyncStoreImpl* store,
qpid::broker::AsyncResultQueue& arq,
- boost::shared_ptr<SimplePersistableQueue> queue);
+ boost::shared_ptr<SimpleQueue> queue);
virtual ~MessageProducer();
void* runProducers();
static void* startProducers(void* ptr);
@@ -58,7 +58,7 @@ private:
const char* m_msgData;
qpid::asyncStore::AsyncStoreImpl* m_store;
qpid::broker::AsyncResultQueue& m_resultQueue;
- boost::shared_ptr<SimplePersistableQueue> m_queue;
+ boost::shared_ptr<SimpleQueue> m_queue;
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
index 7941e761ca..e3fdd1c44d 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
@@ -25,7 +25,7 @@
#include "MessageConsumer.h"
#include "MessageProducer.h"
-#include "SimplePersistableQueue.h"
+#include "SimpleQueue.h"
#include "tests/storePerftools/version.h"
#include "tests/storePerftools/common/ScopedTimer.h"
@@ -87,7 +87,7 @@ PerfTest::run()
reinterpret_cast<void*>(mp.get())));
threads.push_back(tp);
}
- boost::shared_ptr<MessageConsumer> mc(new MessageConsumer(m_testOpts, m_queueList[q]));
+ boost::shared_ptr<MessageConsumer> mc(new MessageConsumer(m_testOpts, m_store, m_resultQueue, m_queueList[q]));
m_consumers.push_back(mc);
for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads
boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mc->startConsumers,
@@ -138,7 +138,7 @@ PerfTest::prepareQueues()
for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) {
std::ostringstream qname;
qname << "queue_" << std::setw(4) << std::setfill('0') << i;
- boost::shared_ptr<SimplePersistableQueue> mpq(new SimplePersistableQueue(qname.str(), m_queueArgs, m_store, m_resultQueue));
+ boost::shared_ptr<SimpleQueue> mpq(new SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue));
mpq->asyncCreate();
m_queueList.push_back(mpq);
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
index e42db090d2..6cdf015f76 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
@@ -48,7 +48,7 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimplePersistableQueue;
+class SimpleQueue;
class MessageConsumer;
class MessageProducer;
class TestOptions;
@@ -72,7 +72,7 @@ private:
qpid::sys::Thread m_pollingThread;
qpid::broker::AsyncResultQueueImpl m_resultQueue;
qpid::asyncStore::AsyncStoreImpl* m_store;
- std::deque<boost::shared_ptr<SimplePersistableQueue> > m_queueList;
+ std::deque<boost::shared_ptr<SimpleQueue> > m_queueList;
std::deque<boost::shared_ptr<MessageProducer> > m_producers;
std::deque<boost::shared_ptr<MessageConsumer> > m_consumers;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
index 07a80c8a33..c1c657727b 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
@@ -22,7 +22,7 @@
*/
#include "QueueAsyncContext.h"
-#include "SimplePersistableMessage.h"
+#include "SimpleMessage.h"
#include <cassert>
@@ -30,7 +30,7 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q,
+QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
qpid::broker::TxnHandle& th,
const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
@@ -44,8 +44,8 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q
assert(m_q.get() != 0);
}
-QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q,
- boost::intrusive_ptr<SimplePersistableMessage> msg,
+QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
+ boost::intrusive_ptr<SimpleMessage> msg,
qpid::broker::TxnHandle& th,
const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
@@ -76,13 +76,13 @@ QueueAsyncContext::getOpStr() const
return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
}
-boost::shared_ptr<SimplePersistableQueue>
+boost::shared_ptr<SimpleQueue>
QueueAsyncContext::getQueue() const
{
return m_q;
}
-boost::intrusive_ptr<SimplePersistableMessage>
+boost::intrusive_ptr<SimpleMessage>
QueueAsyncContext::getMessage() const
{
return m_msg;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
index b4d16fe615..112a5ab1dd 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
@@ -36,19 +36,19 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimplePersistableMessage;
-class SimplePersistableQueue;
+class SimpleMessage;
+class SimpleQueue;
class QueueAsyncContext: public qpid::broker::BrokerAsyncContext
{
public:
- QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q,
+ QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
qpid::broker::TxnHandle& th,
const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
qpid::broker::AsyncResultQueue* const arq);
- QueueAsyncContext(boost::shared_ptr<SimplePersistableQueue> q,
- boost::intrusive_ptr<SimplePersistableMessage> msg,
+ QueueAsyncContext(boost::shared_ptr<SimpleQueue> q,
+ boost::intrusive_ptr<SimpleMessage> msg,
qpid::broker::TxnHandle& th,
const qpid::asyncStore::AsyncOperation::opCode op,
qpid::broker::AsyncResultCallback rcb,
@@ -56,8 +56,8 @@ public:
virtual ~QueueAsyncContext();
qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
const char* getOpStr() const;
- boost::shared_ptr<SimplePersistableQueue> getQueue() const;
- boost::intrusive_ptr<SimplePersistableMessage> getMessage() const;
+ boost::shared_ptr<SimpleQueue> getQueue() const;
+ boost::intrusive_ptr<SimpleMessage> getMessage() const;
qpid::broker::TxnHandle getTxnHandle() const;
qpid::broker::AsyncResultQueue* getAsyncResultQueue() const;
qpid::broker::AsyncResultCallback getAsyncResultCallback() const;
@@ -65,8 +65,8 @@ public:
void destroy();
private:
- boost::shared_ptr<SimplePersistableQueue> m_q;
- boost::intrusive_ptr<SimplePersistableMessage> m_msg;
+ boost::shared_ptr<SimpleQueue> m_q;
+ boost::intrusive_ptr<SimpleMessage> m_msg;
qpid::broker::TxnHandle m_th;
const qpid::asyncStore::AsyncOperation::opCode m_op;
qpid::broker::AsyncResultCallback m_rcb;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
index 8ee858587b..11af7c9466 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
@@ -23,8 +23,8 @@
#include "QueuedMessage.h"
-#include "SimplePersistableMessage.h"
-#include "SimplePersistableQueue.h"
+#include "SimpleMessage.h"
+#include "SimpleQueue.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
@@ -36,8 +36,8 @@ QueuedMessage::QueuedMessage() :
m_queue(0)
{}
-QueuedMessage::QueuedMessage(SimplePersistableQueue* q,
- boost::intrusive_ptr<SimplePersistableMessage> msg) :
+QueuedMessage::QueuedMessage(SimpleQueue* q,
+ boost::intrusive_ptr<SimpleMessage> msg) :
m_queue(q),
m_msg(msg),
m_enqHandle(q->getStore() ? q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()) : qpid::broker::EnqueueHandle(0))
@@ -61,7 +61,13 @@ QueuedMessage::operator=(const QueuedMessage& rhs)
return *this;
}
-boost::intrusive_ptr<SimplePersistableMessage>
+SimpleQueue*
+QueuedMessage::getQueue() const
+{
+ return m_queue;
+}
+
+boost::intrusive_ptr<SimpleMessage>
QueuedMessage::payload() const
{
return m_msg;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
index 896c53ab5b..12c8e4da08 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
@@ -39,19 +39,20 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimplePersistableMessage;
-class SimplePersistableQueue;
+class SimpleMessage;
+class SimpleQueue;
class QueuedMessage
{
public:
QueuedMessage();
- QueuedMessage(SimplePersistableQueue* q,
- boost::intrusive_ptr<SimplePersistableMessage> msg);
+ QueuedMessage(SimpleQueue* q,
+ boost::intrusive_ptr<SimpleMessage> msg);
QueuedMessage(const QueuedMessage& qm);
~QueuedMessage();
QueuedMessage& operator=(const QueuedMessage& rhs);
- boost::intrusive_ptr<SimplePersistableMessage> payload() const;
+ SimpleQueue* getQueue() const;
+ boost::intrusive_ptr<SimpleMessage> payload() const;
const qpid::broker::EnqueueHandle& enqHandle() const;
qpid::broker::EnqueueHandle& enqHandle();
@@ -61,8 +62,8 @@ public:
void abortEnqueue();
private:
- SimplePersistableQueue* m_queue;
- boost::intrusive_ptr<SimplePersistableMessage> m_msg;
+ SimpleQueue* m_queue;
+ boost::intrusive_ptr<SimpleMessage> m_msg;
qpid::broker::EnqueueHandle m_enqHandle;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp
index a9771c1442..29db6ceaf2 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp
@@ -18,10 +18,10 @@
*/
/**
- * \file SimplePersistableMessage.cpp
+ * \file SimpleMessage.cpp
*/
-#include "SimplePersistableMessage.h"
+#include "SimpleMessage.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
@@ -29,83 +29,83 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-SimplePersistableMessage::SimplePersistableMessage(const char* msgData,
- const uint32_t msgSize,
- qpid::asyncStore::AsyncStoreImpl* store) :
+SimpleMessage::SimpleMessage(const char* msgData,
+ const uint32_t msgSize,
+ qpid::asyncStore::AsyncStoreImpl* store) :
m_persistenceId(0ULL),
m_msg(msgData, static_cast<size_t>(msgSize)),
m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0))
{}
-SimplePersistableMessage::~SimplePersistableMessage()
+SimpleMessage::~SimpleMessage()
{}
const qpid::broker::MessageHandle&
-SimplePersistableMessage::getHandle() const
+SimpleMessage::getHandle() const
{
return m_msgHandle;
}
qpid::broker::MessageHandle&
-SimplePersistableMessage::getHandle()
+SimpleMessage::getHandle()
{
return m_msgHandle;
}
uint64_t
-SimplePersistableMessage::contentSize() const
+SimpleMessage::contentSize() const
{
return static_cast<uint64_t>(m_msg.size());
}
void
-SimplePersistableMessage::setPersistenceId(uint64_t id) const
+SimpleMessage::setPersistenceId(uint64_t id) const
{
m_persistenceId = id;
}
uint64_t
-SimplePersistableMessage::getPersistenceId() const
+SimpleMessage::getPersistenceId() const
{
return m_persistenceId;
}
void
-SimplePersistableMessage::encode(qpid::framing::Buffer& buffer) const
+SimpleMessage::encode(qpid::framing::Buffer& buffer) const
{
buffer.putRawData(m_msg);
}
uint32_t
-SimplePersistableMessage::encodedSize() const
+SimpleMessage::encodedSize() const
{
return static_cast<uint32_t>(m_msg.size());
}
void
-SimplePersistableMessage::allDequeuesComplete()
+SimpleMessage::allDequeuesComplete()
{}
uint32_t
-SimplePersistableMessage::encodedHeaderSize() const
+SimpleMessage::encodedHeaderSize() const
{
return 0;
}
bool
-SimplePersistableMessage::isPersistent() const
+SimpleMessage::isPersistent() const
{
return m_msgHandle.isValid();
}
uint64_t
-SimplePersistableMessage::getSize()
+SimpleMessage::getSize()
{
return m_msg.size();
}
void
-SimplePersistableMessage::write(char* target)
+SimpleMessage::write(char* target)
{
::memcpy(target, m_msg.data(), m_msg.size());
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h
index 7ac54ddea1..1b3e034814 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableMessage.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h
@@ -18,11 +18,11 @@
*/
/**
- * \file SimplePersistableMessage.h
+ * \file SimpleMessage.h
*/
-#ifndef tests_storePerftools_asyncPerf_SimplePersistableMessage_h_
-#define tests_storePerftools_asyncPerf_SimplePersistableMessage_h_
+#ifndef tests_storePerftools_asyncPerf_SimpleMessage_h_
+#define tests_storePerftools_asyncPerf_SimpleMessage_h_
#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource
#include "qpid/broker/MessageHandle.h"
@@ -39,16 +39,16 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimplePersistableQueue;
+class SimpleQueue;
-class SimplePersistableMessage: public qpid::broker::PersistableMessage,
- public qpid::broker::DataSource
+class SimpleMessage: public qpid::broker::PersistableMessage,
+ public qpid::broker::DataSource
{
public:
- SimplePersistableMessage(const char* msgData,
- const uint32_t msgSize,
- qpid::asyncStore::AsyncStoreImpl* store);
- virtual ~SimplePersistableMessage();
+ SimpleMessage(const char* msgData,
+ const uint32_t msgSize,
+ qpid::asyncStore::AsyncStoreImpl* store);
+ virtual ~SimpleMessage();
const qpid::broker::MessageHandle& getHandle() const;
qpid::broker::MessageHandle& getHandle();
uint64_t contentSize() const;
@@ -76,4 +76,4 @@ private:
}}} // namespace tests::storePerftools::asyncPerf
-#endif // tests_storePerftools_asyncPerf_SimplePersistableMessage_h_
+#endif // tests_storePerftools_asyncPerf_SimpleMessage_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index be2b4c891b..d8b312f011 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -18,13 +18,13 @@
*/
/**
- * \file SimplePersistableQueue.cpp
+ * \file SimpleQueue.cpp
*/
-#include "SimplePersistableQueue.h"
+#include "SimpleQueue.h"
#include "MessageDeque.h"
-#include "SimplePersistableMessage.h"
+#include "SimpleMessage.h"
#include "QueueAsyncContext.h"
#include "QueuedMessage.h"
@@ -37,13 +37,13 @@ namespace storePerftools {
namespace asyncPerf {
//static
-qpid::broker::TxnHandle SimplePersistableQueue::s_nullTxnHandle; // used for non-txn operations
+qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations
-SimplePersistableQueue::SimplePersistableQueue(const std::string& name,
- const qpid::framing::FieldTable& /*args*/,
- qpid::asyncStore::AsyncStoreImpl* store,
- qpid::broker::AsyncResultQueue& arq) :
+SimpleQueue::SimpleQueue(const std::string& name,
+ const qpid::framing::FieldTable& /*args*/,
+ qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncResultQueue& arq) :
qpid::broker::PersistableQueue(),
m_name(name),
m_store(store),
@@ -62,7 +62,7 @@ SimplePersistableQueue::SimplePersistableQueue(const std::string& name,
}
}
-SimplePersistableQueue::~SimplePersistableQueue()
+SimpleQueue::~SimpleQueue()
{
// m_store->flush(*this);
// TODO: Make destroying the store a test parameter
@@ -72,7 +72,7 @@ SimplePersistableQueue::~SimplePersistableQueue()
// static
void
-SimplePersistableQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh)
+SimpleQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle* const arh)
{
if (arh) {
boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
@@ -81,7 +81,7 @@ SimplePersistableQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle*
std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
<< arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
} else {
-//std::cout << "QQQ SimplePersistableQueue::handleAsyncResult() op=" << qc->getOpStr() << std::endl << std::flush;
+//std::cout << "QQQ SimpleQueue::handleAsyncResult() op=" << qc->getOpStr() << std::endl << std::flush;
// Handle async success here
switch(qc->getOpCode()) {
case qpid::asyncStore::AsyncOperation::QUEUE_CREATE:
@@ -101,7 +101,7 @@ SimplePersistableQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle*
break;
default:
std::ostringstream oss;
- oss << "tests::storePerftools::asyncPerf::SimplePersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode();
+ oss << "tests::storePerftools::asyncPerf::SimpleQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode();
throw qpid::Exception(oss.str());
};
}
@@ -109,25 +109,25 @@ SimplePersistableQueue::handleAsyncResult(const qpid::broker::AsyncResultHandle*
}
const qpid::broker::QueueHandle&
-SimplePersistableQueue::getHandle() const
+SimpleQueue::getHandle() const
{
return m_queueHandle;
}
qpid::broker::QueueHandle&
-SimplePersistableQueue::getHandle()
+SimpleQueue::getHandle()
{
return m_queueHandle;
}
qpid::asyncStore::AsyncStoreImpl*
-SimplePersistableQueue::getStore()
+SimpleQueue::getStore()
{
return m_store;
}
void
-SimplePersistableQueue::asyncCreate()
+SimpleQueue::asyncCreate()
{
if (m_store) {
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
@@ -143,7 +143,7 @@ SimplePersistableQueue::asyncCreate()
}
void
-SimplePersistableQueue::asyncDestroy(const bool deleteQueue)
+SimpleQueue::asyncDestroy(const bool deleteQueue)
{
m_destroyPending = true;
if (m_store) {
@@ -162,7 +162,7 @@ SimplePersistableQueue::asyncDestroy(const bool deleteQueue)
}
void
-SimplePersistableQueue::deliver(boost::intrusive_ptr<SimplePersistableMessage> msg)
+SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
{
QueuedMessage qm(this, msg);
enqueue(s_nullTxnHandle, qm);
@@ -170,7 +170,7 @@ SimplePersistableQueue::deliver(boost::intrusive_ptr<SimplePersistableMessage> m
}
bool
-SimplePersistableQueue::dispatch()
+SimpleQueue::dispatch()
{
QueuedMessage qm;
if (m_messages->consume(qm)) {
@@ -180,8 +180,8 @@ SimplePersistableQueue::dispatch()
}
bool
-SimplePersistableQueue::enqueue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+SimpleQueue::enqueue(qpid::broker::TxnHandle& th,
+ QueuedMessage& qm)
{
ScopedUse u(m_barrier);
if (!u.m_acquired) {
@@ -195,8 +195,8 @@ SimplePersistableQueue::enqueue(qpid::broker::TxnHandle& th,
}
bool
-SimplePersistableQueue::dequeue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
+ QueuedMessage& qm)
{
ScopedUse u(m_barrier);
if (!u.m_acquired) {
@@ -210,54 +210,54 @@ SimplePersistableQueue::dequeue(qpid::broker::TxnHandle& th,
}
void
-SimplePersistableQueue::process(boost::intrusive_ptr<SimplePersistableMessage> msg)
+SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
{
QueuedMessage qm(this, msg);
push(qm);
}
void
-SimplePersistableQueue::enqueueAborted(boost::intrusive_ptr<SimplePersistableMessage> /*msg*/)
+SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage> /*msg*/)
{}
void
-SimplePersistableQueue::encode(qpid::framing::Buffer& buffer) const
+SimpleQueue::encode(qpid::framing::Buffer& buffer) const
{
buffer.putShortString(m_name);
}
uint32_t
-SimplePersistableQueue::encodedSize() const
+SimpleQueue::encodedSize() const
{
return m_name.size() + 1;
}
uint64_t
-SimplePersistableQueue::getPersistenceId() const
+SimpleQueue::getPersistenceId() const
{
return m_persistenceId;
}
void
-SimplePersistableQueue::setPersistenceId(uint64_t persistenceId) const
+SimpleQueue::setPersistenceId(uint64_t persistenceId) const
{
m_persistenceId = persistenceId;
}
void
-SimplePersistableQueue::flush()
+SimpleQueue::flush()
{
//if(m_store) m_store->flush(*this);
}
const std::string&
-SimplePersistableQueue::getName() const
+SimpleQueue::getName() const
{
return m_name;
}
void
-SimplePersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst)
+SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst)
{
if (externalQueueStore != inst && externalQueueStore)
delete externalQueueStore;
@@ -265,13 +265,13 @@ SimplePersistableQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore*
}
uint64_t
-SimplePersistableQueue::getSize()
+SimpleQueue::getSize()
{
return m_persistableData.size();
}
void
-SimplePersistableQueue::write(char* target)
+SimpleQueue::write(char* target)
{
::memcpy(target, m_persistableData.data(), m_persistableData.size());
}
@@ -279,14 +279,14 @@ SimplePersistableQueue::write(char* target)
// --- Members & methods in msg handling path from qpid::Queue ---
// protected
-SimplePersistableQueue::UsageBarrier::UsageBarrier(SimplePersistableQueue& q) :
+SimpleQueue::UsageBarrier::UsageBarrier(SimpleQueue& q) :
m_parent(q),
m_count(0)
{}
// protected
bool
-SimplePersistableQueue::UsageBarrier::acquire()
+SimpleQueue::UsageBarrier::acquire()
{
qpid::sys::Monitor::ScopedLock l(m_monitor);
if (m_parent.m_destroyed) {
@@ -298,7 +298,7 @@ SimplePersistableQueue::UsageBarrier::acquire()
}
// protected
-void SimplePersistableQueue::UsageBarrier::release()
+void SimpleQueue::UsageBarrier::release()
{
qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor);
if (--m_count == 0) {
@@ -307,7 +307,7 @@ void SimplePersistableQueue::UsageBarrier::release()
}
// protected
-void SimplePersistableQueue::UsageBarrier::destroy()
+void SimpleQueue::UsageBarrier::destroy()
{
qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor);
m_parent.m_destroyed = true;
@@ -317,13 +317,13 @@ void SimplePersistableQueue::UsageBarrier::destroy()
}
// protected
-SimplePersistableQueue::ScopedUse::ScopedUse(UsageBarrier& b) :
+SimpleQueue::ScopedUse::ScopedUse(UsageBarrier& b) :
m_barrier(b),
m_acquired(m_barrier.acquire())
{}
// protected
-SimplePersistableQueue::ScopedUse::~ScopedUse()
+SimpleQueue::ScopedUse::~ScopedUse()
{
if (m_acquired) {
m_barrier.release();
@@ -332,8 +332,8 @@ SimplePersistableQueue::ScopedUse::~ScopedUse()
// private
void
-SimplePersistableQueue::push(QueuedMessage& qm,
- bool /*isRecovery*/)
+SimpleQueue::push(QueuedMessage& qm,
+ bool /*isRecovery*/)
{
QueuedMessage removed;
m_messages->push(qm, removed);
@@ -343,8 +343,8 @@ SimplePersistableQueue::push(QueuedMessage& qm,
// private
bool
-SimplePersistableQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
+ QueuedMessage& qm)
{
qm.payload()->setPersistenceId(m_store->getNextRid());
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
@@ -366,8 +366,8 @@ SimplePersistableQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
// private
bool
-SimplePersistableQueue::asyncDequeue(qpid::broker::TxnHandle& th,
- QueuedMessage& qm)
+SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
+ QueuedMessage& qm)
{
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
@@ -388,7 +388,7 @@ SimplePersistableQueue::asyncDequeue(qpid::broker::TxnHandle& th,
// private
void
-SimplePersistableQueue::destroyCheck(const std::string& opDescr) const
+SimpleQueue::destroyCheck(const std::string& opDescr) const
{
if (m_destroyPending || m_destroyed) {
std::ostringstream oss;
@@ -399,7 +399,7 @@ SimplePersistableQueue::destroyCheck(const std::string& opDescr) const
// private
void
-SimplePersistableQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc)
{
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": createComplete()" << std::endl << std::flush;
assert(qc->getQueue().get() == this);
@@ -408,7 +408,7 @@ SimplePersistableQueue::createComplete(const boost::shared_ptr<QueueAsyncContext
// private
void
-SimplePersistableQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc)
{
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": flushComplete()" << std::endl << std::flush;
assert(qc->getQueue().get() == this);
@@ -417,7 +417,7 @@ SimplePersistableQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext>
// private
void
-SimplePersistableQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc)
{
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": destroyComplete()" << std::endl << std::flush;
assert(qc->getQueue().get() == this);
@@ -427,7 +427,7 @@ SimplePersistableQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContex
// private
void
-SimplePersistableQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
{
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush;
assert(qc->getQueue().get() == this);
@@ -441,7 +441,7 @@ SimplePersistableQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContex
// private
void
-SimplePersistableQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
+SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc)
{
//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush;
assert(qc->getQueue().get() == this);
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
index 34ef9407ac..bc9dda0d98 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimplePersistableQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
@@ -18,11 +18,11 @@
*/
/**
- * \file SimplePersistableQueue.h
+ * \file SimpleQueue.h
*/
-#ifndef tests_storePerftools_asyncPerf_SimplePersistableQueue_h_
-#define tests_storePerftools_asyncPerf_SimplePersistableQueue_h_
+#ifndef tests_storePerftools_asyncPerf_SimpleQueue_h_
+#define tests_storePerftools_asyncPerf_SimpleQueue_h_
#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter
#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource
@@ -51,20 +51,20 @@ namespace storePerftools {
namespace asyncPerf {
class Messages;
-class SimplePersistableMessage;
+class SimpleMessage;
class QueueAsyncContext;
class QueuedMessage;
-class SimplePersistableQueue : public boost::enable_shared_from_this<SimplePersistableQueue>,
- public qpid::broker::PersistableQueue,
- public qpid::broker::DataSource
+class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>,
+ public qpid::broker::PersistableQueue,
+ public qpid::broker::DataSource
{
public:
- SimplePersistableQueue(const std::string& name,
- const qpid::framing::FieldTable& args,
- qpid::asyncStore::AsyncStoreImpl* store,
- qpid::broker::AsyncResultQueue& arq);
- virtual ~SimplePersistableQueue();
+ SimpleQueue(const std::string& name,
+ const qpid::framing::FieldTable& args,
+ qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncResultQueue& arq);
+ virtual ~SimpleQueue();
static void handleAsyncResult(const qpid::broker::AsyncResultHandle* const res);
const qpid::broker::QueueHandle& getHandle() const;
@@ -75,14 +75,14 @@ public:
void asyncDestroy(const bool deleteQueue);
// --- Methods in msg handling path from qpid::Queue ---
- void deliver(boost::intrusive_ptr<SimplePersistableMessage> msg);
+ void deliver(boost::intrusive_ptr<SimpleMessage> msg);
bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&) but without Consumer param
bool enqueue(qpid::broker::TxnHandle& th,
QueuedMessage& qm);
bool dequeue(qpid::broker::TxnHandle& th,
QueuedMessage& qm);
- void process(boost::intrusive_ptr<SimplePersistableMessage> msg);
- void enqueueAborted(boost::intrusive_ptr<SimplePersistableMessage> msg);
+ void process(boost::intrusive_ptr<SimpleMessage> msg);
+ void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg);
// --- Interface qpid::broker::Persistable ---
virtual void encode(qpid::framing::Buffer& buffer) const;
@@ -115,10 +115,10 @@ private:
// --- Members & methods in msg handling path copied from qpid::Queue ---
struct UsageBarrier
{
- SimplePersistableQueue& m_parent;
+ SimpleQueue& m_parent;
uint32_t m_count;
qpid::sys::Monitor m_monitor;
- UsageBarrier(SimplePersistableQueue& q);
+ UsageBarrier(SimpleQueue& q);
bool acquire();
void release();
void destroy();
@@ -154,4 +154,4 @@ private:
}}} // namespace tests::storePerftools::asyncPerf
-#endif // tests_storePerftools_asyncPerf_SimplePersistableQueue_h_
+#endif // tests_storePerftools_asyncPerf_SimpleQueue_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
new file mode 100644
index 0000000000..c1d35805a6
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TxnAccept.cpp
+ */
+
+#include "TxnAccept.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+TxnAccept::TxnAccept()
+{}
+
+TxnAccept::~TxnAccept()
+{}
+
+// --- Interface TxnOp ---
+
+bool
+TxnAccept::prepare(qpid::broker::TxnHandle& /*th*/) throw()
+{
+ return false;
+}
+
+void
+TxnAccept::commit() throw()
+{}
+
+void
+TxnAccept::rollback() throw()
+{}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
new file mode 100644
index 0000000000..f164a4c965
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TxnAccept.h
+ */
+
+#ifndef tests_storePerftools_asyncPerf_TxnAccept_h_
+#define tests_storePerftools_asyncPerf_TxnAccept_h_
+
+#include "qpid/broker/TxnOp.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class TxnAccept: public qpid::broker::TxnOp {
+public:
+ TxnAccept();
+ virtual ~TxnAccept();
+
+ // --- Interface TxnOp ---
+ bool prepare(qpid::broker::TxnHandle& th) throw();
+ void commit() throw();
+ void rollback() throw();
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerftools_asyncPerf_TxnAccept_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
index eff7646de6..2927dc60e2 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
@@ -21,8 +21,8 @@
* \file TxnPublish.cpp
*/
-#include "SimplePersistableMessage.h"
-#include "SimplePersistableQueue.h" // debug msg
+#include "SimpleMessage.h"
+#include "SimpleQueue.h" // debug msg
#include "TxnPublish.h"
#include "QueuedMessage.h"
@@ -31,7 +31,7 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-TxnPublish::TxnPublish(boost::intrusive_ptr<SimplePersistableMessage> msg) :
+TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) :
m_msg(msg)
{
//std::cout << "TTT new TxnPublish" << std::endl << std::flush;
@@ -96,7 +96,7 @@ TxnPublish::contentSize()
}
void
-TxnPublish::deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue)
+TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue)
{
//std::cout << "TTT TxnPublish::deliverTo queue=\"" << queue->getName() << "\"" << std::endl << std::flush;
boost::shared_ptr<QueuedMessage> qm(new QueuedMessage(queue.get(), m_msg));
@@ -104,7 +104,7 @@ TxnPublish::deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue)
m_delivered = true;
}
-SimplePersistableMessage&
+SimpleMessage&
TxnPublish::getMessage()
{
return *m_msg;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
index 6e97e99349..a7255314bd 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
@@ -34,9 +34,7 @@
namespace qpid {
namespace broker {
-
class TransactionContext;
-
}}
namespace tests {
@@ -44,14 +42,14 @@ namespace storePerftools {
namespace asyncPerf {
class QueuedMessage;
-class SimplePersistableMessage;
-class SimplePersistableQueue;
+class SimpleMessage;
+class SimpleQueue;
class TxnPublish : public qpid::broker::TxnOp,
public Deliverable
{
public:
- TxnPublish(boost::intrusive_ptr<SimplePersistableMessage> msg);
+ TxnPublish(boost::intrusive_ptr<SimpleMessage> msg);
virtual ~TxnPublish();
// --- Interface TxOp ---
@@ -61,11 +59,11 @@ public:
// --- Interface Deliverable ---
uint64_t contentSize();
- void deliverTo(const boost::shared_ptr<SimplePersistableQueue>& queue);
- SimplePersistableMessage& getMessage();
+ void deliverTo(const boost::shared_ptr<SimpleQueue>& queue);
+ SimpleMessage& getMessage();
private:
- boost::intrusive_ptr<SimplePersistableMessage> m_msg;
+ boost::intrusive_ptr<SimpleMessage> m_msg;
std::list<boost::shared_ptr<QueuedMessage> > m_queues;
std::list<boost::shared_ptr<QueuedMessage> > m_prepared;
};