summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/asyncstore.cmake11
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp4
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h7
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.cpp4
-rw-r--r--cpp/src/qpid/asyncStore/TxnHandleImpl.h8
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h8
-rw-r--r--cpp/src/qpid/broker/QueueAsyncContext.cpp6
-rw-r--r--cpp/src/qpid/broker/QueueAsyncContext.h8
-rw-r--r--cpp/src/qpid/broker/SimpleConsumer.h42
-rw-r--r--cpp/src/qpid/broker/SimpleDeliverable.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp)19
-rw-r--r--cpp/src/qpid/broker/SimpleDeliverable.h (renamed from cpp/src/tests/storePerftools/asyncPerf/Deliverable.h)21
-rw-r--r--cpp/src/qpid/broker/SimpleDeliveryRecord.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp)52
-rw-r--r--cpp/src/qpid/broker/SimpleDeliveryRecord.h (renamed from cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h)41
-rw-r--r--cpp/src/qpid/broker/SimpleMessage.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp)56
-rw-r--r--cpp/src/qpid/broker/SimpleMessage.h (renamed from cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h)33
-rw-r--r--cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp)27
-rw-r--r--cpp/src/qpid/broker/SimpleMessageAsyncContext.h (renamed from cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h)25
-rw-r--r--cpp/src/qpid/broker/SimpleMessageDeque.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp)28
-rw-r--r--cpp/src/qpid/broker/SimpleMessageDeque.h (renamed from cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h)32
-rw-r--r--cpp/src/qpid/broker/SimpleMessages.h (renamed from cpp/src/tests/storePerftools/asyncPerf/Messages.h)25
-rw-r--r--cpp/src/qpid/broker/SimpleQueue.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp)155
-rw-r--r--cpp/src/qpid/broker/SimpleQueue.h (renamed from cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h)99
-rw-r--r--cpp/src/qpid/broker/SimpleQueuedMessage.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp)57
-rw-r--r--cpp/src/qpid/broker/SimpleQueuedMessage.h (renamed from cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h)40
-rw-r--r--cpp/src/qpid/broker/SimpleTxnAccept.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp)27
-rw-r--r--cpp/src/qpid/broker/SimpleTxnAccept.h (renamed from cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h)29
-rw-r--r--cpp/src/qpid/broker/SimpleTxnBuffer.cpp (renamed from cpp/src/qpid/broker/TxnBuffer.cpp)52
-rw-r--r--cpp/src/qpid/broker/SimpleTxnBuffer.h (renamed from cpp/src/qpid/broker/TxnBuffer.h)22
-rw-r--r--cpp/src/qpid/broker/SimpleTxnOp.h (renamed from cpp/src/qpid/broker/TxnOp.h)16
-rw-r--r--cpp/src/qpid/broker/SimpleTxnPublish.cpp (renamed from cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp)35
-rw-r--r--cpp/src/qpid/broker/SimpleTxnPublish.h (renamed from cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h)37
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.cpp4
-rw-r--r--cpp/src/qpid/broker/TxnAsyncContext.h12
-rw-r--r--cpp/src/tests/asyncstore.cmake18
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp25
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h24
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp23
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h16
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp36
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.h9
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp9
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp6
42 files changed, 576 insertions, 632 deletions
diff --git a/cpp/src/asyncstore.cmake b/cpp/src/asyncstore.cmake
index 5656bc8cc4..6171ed5505 100644
--- a/cpp/src/asyncstore.cmake
+++ b/cpp/src/asyncstore.cmake
@@ -64,8 +64,17 @@ set (asyncStore_SOURCES
qpid/broker/MessageHandle.cpp
qpid/broker/QueueAsyncContext.cpp
qpid/broker/QueueHandle.cpp
+ qpid/broker/SimpleDeliverable.cpp
+ qpid/broker/SimpleDeliveryRecord.cpp
+ qpid/broker/SimpleMessage.cpp
+ qpid/broker/SimpleMessageAsyncContext.cpp
+ qpid/broker/SimpleMessageDeque.cpp
+ qpid/broker/SimpleQueue.cpp
+ qpid/broker/SimpleQueuedMessage.cpp
+ qpid/broker/SimpleTxnAccept.cpp
+ qpid/broker/SimpleTxnBuffer.cpp
+ qpid/broker/SimpleTxnPublish.cpp
qpid/broker/TxnAsyncContext.cpp
- qpid/broker/TxnBuffer.cpp
qpid/broker/TxnHandle.cpp
)
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index aa66e7adb8..2ee1d23025 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -75,7 +75,7 @@ AsyncStoreImpl::createTxnHandle()
}
qpid::broker::TxnHandle
-AsyncStoreImpl::createTxnHandle(qpid::broker::TxnBuffer* tb)
+AsyncStoreImpl::createTxnHandle(qpid::broker::SimpleTxnBuffer* tb)
{
return qpid::broker::TxnHandle(new TxnHandleImpl(tb));
}
@@ -90,7 +90,7 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid,
qpid::broker::TxnHandle
AsyncStoreImpl::createTxnHandle(const std::string& xid,
const bool tpcFlag,
- qpid::broker::TxnBuffer* tb)
+ qpid::broker::SimpleTxnBuffer* tb)
{
return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tpcFlag, tb));
}
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
index eb3f090ad7..40a7552a68 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -42,8 +42,7 @@ class Poller;
namespace asyncStore {
-class AsyncStoreImpl : public qpid::broker::AsyncTransactionalStore,
- public qpid::broker::AsyncStore
+class AsyncStoreImpl : public qpid::broker::AsyncStore
{
public:
AsyncStoreImpl(boost::shared_ptr<qpid::sys::Poller> poller,
@@ -59,12 +58,12 @@ public:
// --- Interface from AsyncTransactionalStore ---
qpid::broker::TxnHandle createTxnHandle();
- qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb);
+ qpid::broker::TxnHandle createTxnHandle(qpid::broker::SimpleTxnBuffer* tb);
qpid::broker::TxnHandle createTxnHandle(const std::string& xid,
const bool tpcFlag);
qpid::broker::TxnHandle createTxnHandle(const std::string& xid,
const bool tpcFlag,
- qpid::broker::TxnBuffer* tb);
+ qpid::broker::SimpleTxnBuffer* tb);
void submitPrepare(qpid::broker::TxnHandle& txnHandle,
boost::shared_ptr<qpid::broker::TpcTxnAsyncContext> TxnCtxt);
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
index dd644b29bd..50dce1b2af 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
@@ -31,7 +31,7 @@ TxnHandleImpl::TxnHandleImpl() :
m_txnBuffer(0)
{}
-TxnHandleImpl::TxnHandleImpl(qpid::broker::TxnBuffer* tb) :
+TxnHandleImpl::TxnHandleImpl(qpid::broker::SimpleTxnBuffer* tb) :
m_tpcFlag(false),
m_txnBuffer(tb)
{}
@@ -44,7 +44,7 @@ TxnHandleImpl::TxnHandleImpl(const std::string& xid, const bool tpcFlag) :
TxnHandleImpl::TxnHandleImpl(const std::string& xid,
const bool tpcFlag,
- qpid::broker::TxnBuffer* tb) :
+ qpid::broker::SimpleTxnBuffer* tb) :
m_xid(xid),
m_tpcFlag(tpcFlag),
m_txnBuffer(tb)
diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
index e1f8afff3e..ce23665d5b 100644
--- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h
+++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h
@@ -33,7 +33,7 @@
namespace qpid {
namespace broker {
-class TxnBuffer;
+class SimpleTxnBuffer;
}
namespace asyncStore {
@@ -42,9 +42,9 @@ class TxnHandleImpl : public virtual qpid::RefCounted
{
public:
TxnHandleImpl();
- TxnHandleImpl(qpid::broker::TxnBuffer* tb);
+ TxnHandleImpl(qpid::broker::SimpleTxnBuffer* tb);
TxnHandleImpl(const std::string& xid, const bool tpcFlag);
- TxnHandleImpl(const std::string& xid, const bool tpcFlag, qpid::broker::TxnBuffer* tb);
+ TxnHandleImpl(const std::string& xid, const bool tpcFlag, qpid::broker::SimpleTxnBuffer* tb);
virtual ~TxnHandleImpl();
const std::string& getXid() const;
bool is2pc() const;
@@ -52,7 +52,7 @@ public:
private:
std::string m_xid;
bool m_tpcFlag;
- qpid::broker::TxnBuffer* const m_txnBuffer;
+ qpid::broker::SimpleTxnBuffer* const m_txnBuffer;
};
}} // namespace qpid::asyncStore
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index 6f1c02e059..7009565a7c 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -70,19 +70,19 @@ class TxnHandle;
class QueueAsyncContext;
class TpcTxnAsyncContext;
class TxnAsyncContext;
-class TxnBuffer;
+class SimpleTxnBuffer;
class AsyncTransactionalStore {
public:
virtual ~AsyncTransactionalStore() {}
virtual TxnHandle createTxnHandle() = 0;
- virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0;
+ virtual TxnHandle createTxnHandle(SimpleTxnBuffer* tb) = 0;
virtual TxnHandle createTxnHandle(const std::string& xid,
const bool tpcFlag) = 0;
virtual TxnHandle createTxnHandle(const std::string& xid,
const bool tpcFlag,
- TxnBuffer* tb) = 0;
+ SimpleTxnBuffer* tb) = 0;
virtual void submitPrepare(TxnHandle&,
boost::shared_ptr<TpcTxnAsyncContext>) = 0; // Distributed txns only
@@ -94,7 +94,7 @@ public:
};
// Subclassed by store:
-class AsyncStore {
+class AsyncStore : public AsyncTransactionalStore {
public:
virtual ~AsyncStore() {}
diff --git a/cpp/src/qpid/broker/QueueAsyncContext.cpp b/cpp/src/qpid/broker/QueueAsyncContext.cpp
index 4bd2d271eb..02eb2e9546 100644
--- a/cpp/src/qpid/broker/QueueAsyncContext.cpp
+++ b/cpp/src/qpid/broker/QueueAsyncContext.cpp
@@ -48,7 +48,7 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
{}
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
- TxnBuffer* tb,
+ SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
@@ -61,7 +61,7 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
- TxnBuffer* tb,
+ SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
m_q(q),
@@ -89,7 +89,7 @@ QueueAsyncContext::getMessage() const
return m_msg;
}
-TxnBuffer*
+SimpleTxnBuffer*
QueueAsyncContext::getTxnBuffer() const {
return m_tb;
}
diff --git a/cpp/src/qpid/broker/QueueAsyncContext.h b/cpp/src/qpid/broker/QueueAsyncContext.h
index e9ba2ebbac..8657922377 100644
--- a/cpp/src/qpid/broker/QueueAsyncContext.h
+++ b/cpp/src/qpid/broker/QueueAsyncContext.h
@@ -52,18 +52,18 @@ public:
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
- TxnBuffer* tb,
+ SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
boost::intrusive_ptr<PersistableMessage> msg,
- TxnBuffer* tb,
+ SimpleTxnBuffer* tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
virtual ~QueueAsyncContext();
boost::shared_ptr<PersistableQueue> getQueue() const;
boost::intrusive_ptr<PersistableMessage> getMessage() const;
- TxnBuffer* getTxnBuffer() const;
+ SimpleTxnBuffer* getTxnBuffer() const;
AsyncResultQueue* getAsyncResultQueue() const;
AsyncResultCallback getAsyncResultCallback() const;
void invokeCallback(const AsyncResultHandle* const arh) const;
@@ -72,7 +72,7 @@ public:
private:
boost::shared_ptr<PersistableQueue> m_q;
boost::intrusive_ptr<PersistableMessage> m_msg;
- TxnBuffer* m_tb;
+ SimpleTxnBuffer* m_tb;
AsyncResultCallback m_rcb;
AsyncResultQueue* const m_arq;
};
diff --git a/cpp/src/qpid/broker/SimpleConsumer.h b/cpp/src/qpid/broker/SimpleConsumer.h
new file mode 100644
index 0000000000..6601c65a42
--- /dev/null
+++ b/cpp/src/qpid/broker/SimpleConsumer.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file SimpleConsumer.h
+ */
+
+#ifndef qpid_broker_SimpleConsumer_h_
+#define qpid_broker_SimpleConsumer_h_
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+class SimpleDeliveryRecord;
+
+class SimpleConsumer {
+public:
+ virtual ~SimpleConsumer() {}
+ virtual void commitComplete() = 0;
+ virtual void record(boost::shared_ptr<SimpleDeliveryRecord> dr) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif // qpid_broker_SimpleConsumer_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp b/cpp/src/qpid/broker/SimpleDeliverable.cpp
index 9da7e348e0..7037a377c5 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.cpp
+++ b/cpp/src/qpid/broker/SimpleDeliverable.cpp
@@ -18,26 +18,23 @@
*/
/**
- * \file Deliverable.cpp
+ * \file SimpleDeliverable.cpp
*/
-#include "Deliverable.h"
+#include "SimpleDeliverable.h"
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
-Deliverable::Deliverable() :
+SimpleDeliverable::SimpleDeliverable() :
m_delivered(false)
{}
-Deliverable::~Deliverable()
-{}
+SimpleDeliverable::~SimpleDeliverable() {}
bool
-Deliverable::isDelivered() const
-{
+SimpleDeliverable::isDelivered() const {
return m_delivered;
}
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
diff --git a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h b/cpp/src/qpid/broker/SimpleDeliverable.h
index 990d53a199..6441e14841 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/Deliverable.h
+++ b/cpp/src/qpid/broker/SimpleDeliverable.h
@@ -18,27 +18,26 @@
*/
/**
- * \file Deliverable.h
+ * \file SimpleDeliverable.h
*/
-#ifndef tests_storePerftools_asyncPerf_Deliverable_h_
-#define tests_storePerftools_asyncPerf_Deliverable_h_
+#ifndef qpid_broker_SimpleDeliverable_h_
+#define qpid_broker_SimpleDeliverable_h_
#include <boost/shared_ptr.hpp>
#include <stdint.h> // uint64_t
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
class SimpleMessage;
class SimpleQueue;
-class Deliverable
+class SimpleDeliverable
{
public:
- Deliverable();
- virtual ~Deliverable();
+ SimpleDeliverable();
+ virtual ~SimpleDeliverable();
virtual uint64_t contentSize() = 0;
virtual void deliverTo(const boost::shared_ptr<SimpleQueue>& queue) = 0;
@@ -49,6 +48,6 @@ protected:
bool m_delivered;
};
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
-#endif // tests_storePerftools_asyncPerf_Deliverable_h_
+#endif // qpid_broker_SimpleDeliverable_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp
index 6f33369a26..b71df6975b 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp
@@ -18,35 +18,32 @@
*/
/**
- * \file DeliveryRecord.cpp
+ * \file SimpleDeliveryRecord.cpp
*/
-#include "DeliveryRecord.h"
+#include "SimpleDeliveryRecord.h"
-#include "MessageConsumer.h"
-#include "QueuedMessage.h"
+#include "SimpleConsumer.h"
#include "SimpleMessage.h"
#include "SimpleQueue.h"
+#include "SimpleQueuedMessage.h"
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
-DeliveryRecord::DeliveryRecord(boost::shared_ptr<QueuedMessage> qm,
- MessageConsumer& mc,
- bool accepted) :
+SimpleDeliveryRecord::SimpleDeliveryRecord(boost::shared_ptr<SimpleQueuedMessage> qm,
+ SimpleConsumer& sc,
+ bool accepted) :
m_queuedMessage(qm),
- m_msgConsumer(mc),
+ m_msgConsumer(sc),
m_accepted(accepted),
m_ended(accepted)
{}
-DeliveryRecord::~DeliveryRecord()
-{}
+SimpleDeliveryRecord::~SimpleDeliveryRecord() {}
bool
-DeliveryRecord::accept()
-{
+SimpleDeliveryRecord::accept() {
if (!m_ended) {
m_queuedMessage->getQueue()->dequeue(m_queuedMessage);
m_accepted = true;
@@ -56,47 +53,40 @@ DeliveryRecord::accept()
}
bool
-DeliveryRecord::isAccepted() const
-{
+SimpleDeliveryRecord::isAccepted() const {
return m_accepted;
}
bool
-DeliveryRecord::setEnded()
-{
+SimpleDeliveryRecord::setEnded() {
m_ended = true;
m_queuedMessage->payload() = boost::intrusive_ptr<SimpleMessage>(0);
return isRedundant();
}
bool
-DeliveryRecord::isEnded() const
-{
+SimpleDeliveryRecord::isEnded() const {
return m_ended;
}
bool
-DeliveryRecord::isRedundant() const
-{
+SimpleDeliveryRecord::isRedundant() const {
return m_ended;
}
void
-DeliveryRecord::dequeue(qpid::broker::TxnBuffer* tb)
-{
+SimpleDeliveryRecord::dequeue(qpid::broker::SimpleTxnBuffer* tb) {
m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage);
}
void
-DeliveryRecord::committed() const
-{
+SimpleDeliveryRecord::committed() const {
m_msgConsumer.commitComplete();
}
-boost::shared_ptr<QueuedMessage>
-DeliveryRecord::getQueuedMessage() const
-{
+boost::shared_ptr<SimpleQueuedMessage>
+SimpleDeliveryRecord::getQueuedMessage() const {
return m_queuedMessage;
}
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/qpid/broker/SimpleDeliveryRecord.h
index 6c5d87f374..622ce578d7 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/SimpleDeliveryRecord.h
@@ -18,49 +18,42 @@
*/
/**
- * \file DeliveryRecord.h
+ * \file SimpleDeliveryRecord.h
*/
-#ifndef tests_storePerftools_asyncPerf_DeliveryRecord_h_
-#define tests_storePerftools_asyncPerf_DeliveryRecord_h_
-
-//#include "QueuedMessage.h"
+#ifndef qpid_broker_SimpleDeliveryRecord_h_
+#define qpid_broker_SimpleDeliveryRecord_h_
#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
-class TxnBuffer;
-}}
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-class MessageConsumer;
-class QueuedMessage;
+class SimpleConsumer;
+class SimpleQueuedMessage;
+class SimpleTxnBuffer;
-class DeliveryRecord {
+class SimpleDeliveryRecord {
public:
- DeliveryRecord(boost::shared_ptr<QueuedMessage> qm,
- MessageConsumer& mc,
- bool accepted);
- virtual ~DeliveryRecord();
+ SimpleDeliveryRecord(boost::shared_ptr<SimpleQueuedMessage> qm,
+ SimpleConsumer& sc,
+ bool accepted);
+ virtual ~SimpleDeliveryRecord();
bool accept();
bool isAccepted() const;
bool setEnded();
bool isEnded() const;
bool isRedundant() const;
- void dequeue(qpid::broker::TxnBuffer* tb);
+ void dequeue(qpid::broker::SimpleTxnBuffer* tb);
void committed() const;
- boost::shared_ptr<QueuedMessage> getQueuedMessage() const;
+ boost::shared_ptr<SimpleQueuedMessage> getQueuedMessage() const;
private:
- boost::shared_ptr<QueuedMessage> m_queuedMessage;
- MessageConsumer& m_msgConsumer;
+ boost::shared_ptr<SimpleQueuedMessage> m_queuedMessage;
+ SimpleConsumer& m_msgConsumer;
bool m_accepted : 1;
bool m_ended : 1;
};
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
-#endif // tests_storePerftools_asyncPerf_DeliveryRecord_h_
+#endif // qpid_broker_SimpleDeliveryRecord_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp b/cpp/src/qpid/broker/SimpleMessage.cpp
index bacf438b9f..1239533edf 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.cpp
+++ b/cpp/src/qpid/broker/SimpleMessage.cpp
@@ -25,98 +25,84 @@
#include <string.h> // memcpy()
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
SimpleMessage::SimpleMessage(const char* msgData,
const uint32_t msgSize) :
m_persistenceId(0ULL),
m_msg(msgData, static_cast<size_t>(msgSize)),
m_store(0),
- m_msgHandle(qpid::broker::MessageHandle())
+ m_msgHandle(MessageHandle())
{}
SimpleMessage::SimpleMessage(const char* msgData,
const uint32_t msgSize,
- qpid::broker::AsyncStore* store) :
+ AsyncStore* store) :
m_persistenceId(0ULL),
m_msg(msgData, static_cast<size_t>(msgSize)),
m_store(store),
- m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle())
+ m_msgHandle(store ? store->createMessageHandle(this) : MessageHandle())
{}
-SimpleMessage::~SimpleMessage()
-{}
+SimpleMessage::~SimpleMessage() {}
-const qpid::broker::MessageHandle&
-SimpleMessage::getHandle() const
-{
+const MessageHandle&
+SimpleMessage::getHandle() const {
return m_msgHandle;
}
-qpid::broker::MessageHandle&
-SimpleMessage::getHandle()
-{
+MessageHandle&
+SimpleMessage::getHandle() {
return m_msgHandle;
}
uint64_t
-SimpleMessage::contentSize() const
-{
+SimpleMessage::contentSize() const {
return static_cast<uint64_t>(m_msg.size());
}
void
-SimpleMessage::setPersistenceId(uint64_t id) const
-{
+SimpleMessage::setPersistenceId(uint64_t id) const {
m_persistenceId = id;
}
uint64_t
-SimpleMessage::getPersistenceId() const
-{
+SimpleMessage::getPersistenceId() const {
return m_persistenceId;
}
void
-SimpleMessage::encode(qpid::framing::Buffer& buffer) const
-{
+SimpleMessage::encode(qpid::framing::Buffer& buffer) const {
buffer.putRawData(m_msg);
}
uint32_t
-SimpleMessage::encodedSize() const
-{
+SimpleMessage::encodedSize() const {
return static_cast<uint32_t>(m_msg.size());
}
void
-SimpleMessage::allDequeuesComplete()
-{}
+SimpleMessage::allDequeuesComplete() {}
uint32_t
-SimpleMessage::encodedHeaderSize() const
-{
+SimpleMessage::encodedHeaderSize() const {
return 0;
}
bool
-SimpleMessage::isPersistent() const
-{
+SimpleMessage::isPersistent() const {
return m_store != 0;
}
uint64_t
-SimpleMessage::getSize()
-{
+SimpleMessage::getSize() {
return m_msg.size();
}
void
-SimpleMessage::write(char* target)
-{
+SimpleMessage::write(char* target) {
::memcpy(target, m_msg.data(), m_msg.size());
}
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h b/cpp/src/qpid/broker/SimpleMessage.h
index 169f5a8959..edfaa8d13b 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleMessage.h
+++ b/cpp/src/qpid/broker/SimpleMessage.h
@@ -21,29 +21,28 @@
* \file SimpleMessage.h
*/
-#ifndef tests_storePerftools_asyncPerf_SimpleMessage_h_
-#define tests_storePerftools_asyncPerf_SimpleMessage_h_
+#ifndef qpid_broker_SimpleMessage_h_
+#define qpid_broker_SimpleMessage_h_
-#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource
-#include "qpid/broker/MessageHandle.h"
-#include "qpid/broker/PersistableMessage.h"
+#include "AsyncStore.h" // DataSource
+#include "MessageHandle.h"
+#include "PersistableMessage.h"
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
-class SimpleMessage: public qpid::broker::PersistableMessage,
- public qpid::broker::DataSource
+class SimpleMessage: public PersistableMessage,
+ public DataSource
{
public:
SimpleMessage(const char* msgData,
const uint32_t msgSize);
SimpleMessage(const char* msgData,
const uint32_t msgSize,
- qpid::broker::AsyncStore* store);
+ AsyncStore* store);
virtual ~SimpleMessage();
- const qpid::broker::MessageHandle& getHandle() const;
- qpid::broker::MessageHandle& getHandle();
+ const MessageHandle& getHandle() const;
+ MessageHandle& getHandle();
uint64_t contentSize() const;
// --- Interface Persistable ---
@@ -64,11 +63,11 @@ public:
private:
mutable uint64_t m_persistenceId;
const std::string m_msg;
- qpid::broker::AsyncStore* m_store;
+ AsyncStore* m_store;
- qpid::broker::MessageHandle m_msgHandle;
+ MessageHandle m_msgHandle;
};
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
-#endif // tests_storePerftools_asyncPerf_SimpleMessage_h_
+#endif // qpid_broker_SimpleMessage_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp
index e3bfe9ae7a..a88258f5bc 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
+++ b/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp
@@ -18,21 +18,20 @@
*/
/**
- * \file MessageContext.cpp
+ * \file SimpleMessageAsyncContext.cpp
*/
-#include "MessageAsyncContext.h"
+#include "SimpleMessageAsyncContext.h"
#include "SimpleMessage.h"
#include <cassert>
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
-MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
- boost::shared_ptr<SimpleQueue> q) :
+SimpleMessageAsyncContext::SimpleMessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
+ boost::shared_ptr<SimpleQueue> q) :
m_msg(msg),
m_q(q)
{
@@ -40,25 +39,21 @@ MessageAsyncContext::MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg
assert(m_q.get() != 0);
}
-MessageAsyncContext::~MessageAsyncContext()
-{}
+SimpleMessageAsyncContext::~SimpleMessageAsyncContext() {}
boost::intrusive_ptr<SimpleMessage>
-MessageAsyncContext::getMessage() const
-{
+SimpleMessageAsyncContext::getMessage() const {
return m_msg;
}
boost::shared_ptr<SimpleQueue>
-MessageAsyncContext::getQueue() const
-{
+SimpleMessageAsyncContext::getQueue() const {
return m_q;
}
void
-MessageAsyncContext::destroy()
-{
+SimpleMessageAsyncContext::destroy() {
delete this;
}
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/qpid/broker/SimpleMessageAsyncContext.h
index 9252fbda45..e3975e790e 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
+++ b/cpp/src/qpid/broker/SimpleMessageAsyncContext.h
@@ -18,30 +18,29 @@
*/
/**
- * \file MessageContext.h
+ * \file SimpleMessageAsyncContext.h
*/
-#ifndef tests_storePerfTools_asyncPerf_MessageContext_h_
-#define tests_storePerfTools_asyncPerf_MessageContext_h_
+#ifndef qpid_broker_SimpleMessageAsyncContext_h_
+#define qpid_broker_SimpleMessageAsyncContext_h_
-#include "qpid/asyncStore/AsyncOperation.h"
+#include "AsyncStore.h" // BrokerAsyncContext
#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
class SimpleMessage;
class SimpleQueue;
-class MessageAsyncContext : public qpid::broker::BrokerAsyncContext
+class SimpleMessageAsyncContext : public BrokerAsyncContext
{
public:
- MessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
- boost::shared_ptr<SimpleQueue> q);
- virtual ~MessageAsyncContext();
+ SimpleMessageAsyncContext(boost::intrusive_ptr<SimpleMessage> msg,
+ boost::shared_ptr<SimpleQueue> q);
+ virtual ~SimpleMessageAsyncContext();
boost::intrusive_ptr<SimpleMessage> getMessage() const;
boost::shared_ptr<SimpleQueue> getQueue() const;
void destroy();
@@ -51,6 +50,6 @@ private:
boost::shared_ptr<SimpleQueue> m_q;
};
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
-#endif // tests_storePerfTools_asyncPerf_MessageContext_h_
+#endif // qpid_broker_SimpleMessageAsyncContext_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp b/cpp/src/qpid/broker/SimpleMessageDeque.cpp
index 1fa2c087ac..0aadcfd94a 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp
+++ b/cpp/src/qpid/broker/SimpleMessageDeque.cpp
@@ -18,41 +18,35 @@
*/
/**
- * \file MessageDeque.cpp
+ * \file SimpleMessageDeque.cpp
*/
-#include "MessageDeque.h"
+#include "SimpleMessageDeque.h"
-#include "QueuedMessage.h"
+#include "SimpleQueuedMessage.h"
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
-MessageDeque::MessageDeque()
-{}
+SimpleMessageDeque::SimpleMessageDeque() {}
-MessageDeque::~MessageDeque()
-{}
+SimpleMessageDeque::~SimpleMessageDeque() {}
uint32_t
-MessageDeque::size()
-{
+SimpleMessageDeque::size() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
return m_messages.size();
}
bool
-MessageDeque::push(boost::shared_ptr<QueuedMessage>& added)
-{
+SimpleMessageDeque::push(boost::shared_ptr<SimpleQueuedMessage>& added) {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
m_messages.push_back(added);
return false;
}
bool
-MessageDeque::consume(boost::shared_ptr<QueuedMessage>& msg)
-{
+SimpleMessageDeque::consume(boost::shared_ptr<SimpleQueuedMessage>& msg) {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
if (!m_messages.empty()) {
msg = m_messages.front();
@@ -62,4 +56,4 @@ MessageDeque::consume(boost::shared_ptr<QueuedMessage>& msg)
return false;
}
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h b/cpp/src/qpid/broker/SimpleMessageDeque.h
index 021015f3e0..5db0755a43 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h
+++ b/cpp/src/qpid/broker/SimpleMessageDeque.h
@@ -18,42 +18,40 @@
*/
/**
- * \file MessageDeque.h
+ * \file SimpleMessageDeque.h
*/
/*
* This is a copy of qpid::broker::MessageDeque.h, but using the local
- * tests::storePerftools::asyncPerf::QueuedMessage class instead of
- * qpid::broker::QueuedMessage.
+ * SimpleQueuedMessage class instead of QueuedMessage.
*/
-#ifndef tests_storePerftools_asyncPerf_MessageDeque_h_
-#define tests_storePerftools_asyncPerf_MessageDeque_h_
+#ifndef qpid_broker_SimpleMessageDeque_h_
+#define qpid_broker_SimpleMessageDeque_h_
-#include "Messages.h"
+#include "SimpleMessages.h"
#include "qpid/sys/Mutex.h"
#include <deque>
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
-class MessageDeque : public Messages
+class SimpleMessageDeque : public SimpleMessages
{
public:
- MessageDeque();
- virtual ~MessageDeque();
+ SimpleMessageDeque();
+ virtual ~SimpleMessageDeque();
uint32_t size();
- bool push(boost::shared_ptr<QueuedMessage>& added);
- bool consume(boost::shared_ptr<QueuedMessage>& msg);
+ bool push(boost::shared_ptr<SimpleQueuedMessage>& added);
+ bool consume(boost::shared_ptr<SimpleQueuedMessage>& msg);
private:
- std::deque<boost::shared_ptr<QueuedMessage> > m_messages;
+ std::deque<boost::shared_ptr<SimpleQueuedMessage> > m_messages;
qpid::sys::Mutex m_msgMutex;
};
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
-#endif // tests_storePerftools_asyncPerf_MessageDeque_h_
+#endif // qpid_broker_SimpleMessageDeque_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/Messages.h b/cpp/src/qpid/broker/SimpleMessages.h
index c1bfa328ea..2a40859032 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/Messages.h
+++ b/cpp/src/qpid/broker/SimpleMessages.h
@@ -18,7 +18,7 @@
*/
/**
- * \file Messages.h
+ * \file SimpleMessages.h
*/
/*
@@ -27,27 +27,26 @@
* qpid::broker::QueuedMessage.
*/
-#ifndef tests_storePerftools_asyncPerf_Messages_h_
-#define tests_storePerftools_asyncPerf_Messages_h_
+#ifndef qpid_broker_SimpleMessages_h_
+#define qpid_broker_SimpleMessages_h_
#include <boost/shared_ptr.hpp>
#include <stdint.h>
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
-class QueuedMessage;
+class SimpleQueuedMessage;
-class Messages
+class SimpleMessages
{
public:
- virtual ~Messages() {}
+ virtual ~SimpleMessages() {}
virtual uint32_t size() = 0;
- virtual bool push(boost::shared_ptr<QueuedMessage>& added) = 0;
- virtual bool consume(boost::shared_ptr<QueuedMessage>& msg) = 0;
+ virtual bool push(boost::shared_ptr<SimpleQueuedMessage>& added) = 0;
+ virtual bool consume(boost::shared_ptr<SimpleQueuedMessage>& msg) = 0;
};
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
-#endif // tests_storePerftools_asyncPerf_Messages_h_
+#endif // qpid_broker_SimpleMessages_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/qpid/broker/SimpleQueue.cpp
index 06b4e9333f..5cd8841f94 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/qpid/broker/SimpleQueue.cpp
@@ -23,31 +23,29 @@
#include "SimpleQueue.h"
-#include "DeliveryRecord.h"
-#include "MessageConsumer.h"
-#include "MessageDeque.h"
-#include "QueuedMessage.h"
+#include "AsyncResultHandle.h"
+#include "QueueAsyncContext.h"
+#include "SimpleConsumer.h"
+#include "SimpleDeliveryRecord.h"
#include "SimpleMessage.h"
-
-#include "qpid/broker/AsyncResultHandle.h"
-#include "qpid/broker/QueueAsyncContext.h"
-#include "qpid/broker/TxnBuffer.h"
+#include "SimpleMessageDeque.h"
+#include "SimpleQueuedMessage.h"
+#include "SimpleTxnBuffer.h"
#include <string.h> // memcpy()
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
//static
-qpid::broker::TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations
+TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations
SimpleQueue::SimpleQueue(const std::string& name,
const qpid::framing::FieldTable& /*args*/,
- qpid::broker::AsyncStore* store,
- qpid::broker::AsyncResultQueue& arq) :
- qpid::broker::PersistableQueue(),
+ AsyncStore* store,
+ AsyncResultQueue& arq) :
+ PersistableQueue(),
m_name(name),
m_store(store),
m_resultQueue(arq),
@@ -57,7 +55,7 @@ SimpleQueue::SimpleQueue(const std::string& name,
m_destroyPending(false),
m_destroyed(false),
m_barrier(*this),
- m_messages(new MessageDeque())
+ m_messages(new SimpleMessageDeque())
{
if (m_store != 0) {
const qpid::types::Variant::Map qo;
@@ -67,17 +65,17 @@ SimpleQueue::SimpleQueue(const std::string& name,
SimpleQueue::~SimpleQueue() {}
-const qpid::broker::QueueHandle&
+const QueueHandle&
SimpleQueue::getHandle() const {
return m_queueHandle;
}
-qpid::broker::QueueHandle&
+QueueHandle&
SimpleQueue::getHandle() {
return m_queueHandle;
}
-qpid::broker::AsyncStore*
+AsyncStore*
SimpleQueue::getStore() {
return m_store;
}
@@ -85,9 +83,9 @@ SimpleQueue::getStore() {
void
SimpleQueue::asyncCreate() {
if (m_store) {
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- &handleAsyncCreateResult,
- &m_resultQueue));
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ &handleAsyncCreateResult,
+ &m_resultQueue));
m_store->submitCreate(m_queueHandle, this, qac);
++m_asyncOpCounter;
}
@@ -95,10 +93,9 @@ SimpleQueue::asyncCreate() {
//static
void
-SimpleQueue::handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh) {
+SimpleQueue::handleAsyncCreateResult(const AsyncResultHandle* const arh) {
if (arh) {
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
- boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
if (arh->getErrNo()) {
// TODO: Handle async failure here (other than by simply printing a message)
@@ -116,9 +113,9 @@ SimpleQueue::asyncDestroy(const bool deleteQueue)
m_destroyPending = true;
if (m_store) {
if (deleteQueue) {
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- &handleAsyncDestroyResult,
- &m_resultQueue));
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ &handleAsyncDestroyResult,
+ &m_resultQueue));
m_store->submitDestroy(m_queueHandle, qac);
++m_asyncOpCounter;
}
@@ -128,10 +125,10 @@ SimpleQueue::asyncDestroy(const bool deleteQueue)
//static
void
-SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh) {
+SimpleQueue::handleAsyncDestroyResult(const AsyncResultHandle* const arh) {
if (arh) {
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
- boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<QueueAsyncContext> qc =
+ boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
if (arh->getErrNo()) {
// TODO: Handle async failure here (other than by simply printing a message)
@@ -145,30 +142,30 @@ SimpleQueue::handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* con
void
SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) {
- boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
+ boost::shared_ptr<SimpleQueuedMessage> qm(boost::shared_ptr<SimpleQueuedMessage>(new SimpleQueuedMessage(this, msg)));
enqueue(qm);
push(qm);
}
bool
-SimpleQueue::dispatch(MessageConsumer& mc) {
- boost::shared_ptr<QueuedMessage> qm;
+SimpleQueue::dispatch(SimpleConsumer& sc) {
+ boost::shared_ptr<SimpleQueuedMessage> qm;
if (m_messages->consume(qm)) {
- boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false));
- mc.record(dr);
+ boost::shared_ptr<SimpleDeliveryRecord> dr(new SimpleDeliveryRecord(qm, sc, false));
+ sc.record(dr);
return true;
}
return false;
}
bool
-SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) {
+SimpleQueue::enqueue(boost::shared_ptr<SimpleQueuedMessage> qm) {
return enqueue(0, qm);
}
bool
-SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm) {
+SimpleQueue::enqueue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm) {
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
@@ -181,13 +178,13 @@ SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb,
}
bool
-SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) {
+SimpleQueue::dequeue(boost::shared_ptr<SimpleQueuedMessage> qm) {
return dequeue(0, qm);
}
bool
-SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm) {
+SimpleQueue::dequeue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm) {
ScopedUse u(m_barrier);
if (!u.m_acquired) {
return false;
@@ -201,7 +198,7 @@ SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb,
void
SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) {
- push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
+ push(boost::shared_ptr<SimpleQueuedMessage>(new SimpleQueuedMessage(this, msg)));
}
void
@@ -238,7 +235,7 @@ SimpleQueue::getName() const {
}
void
-SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) {
+SimpleQueue::setExternalQueueStore(ExternalQueueStore* inst) {
if (externalQueueStore != inst && externalQueueStore)
delete externalQueueStore;
externalQueueStore = inst;
@@ -264,8 +261,7 @@ SimpleQueue::UsageBarrier::UsageBarrier(SimpleQueue& q) :
// protected
bool
-SimpleQueue::UsageBarrier::acquire()
-{
+SimpleQueue::UsageBarrier::acquire() {
qpid::sys::Monitor::ScopedLock l(m_monitor);
if (m_parent.m_destroyed) {
return false;
@@ -276,8 +272,7 @@ SimpleQueue::UsageBarrier::acquire()
}
// protected
-void SimpleQueue::UsageBarrier::release()
-{
+void SimpleQueue::UsageBarrier::release() {
qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor);
if (--m_count == 0) {
m_monitor.notifyAll();
@@ -285,8 +280,7 @@ void SimpleQueue::UsageBarrier::release()
}
// protected
-void SimpleQueue::UsageBarrier::destroy()
-{
+void SimpleQueue::UsageBarrier::destroy() {
qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor);
m_parent.m_destroyed = true;
while (m_count) {
@@ -301,8 +295,7 @@ SimpleQueue::ScopedUse::ScopedUse(UsageBarrier& b) :
{}
// protected
-SimpleQueue::ScopedUse::~ScopedUse()
-{
+SimpleQueue::ScopedUse::~ScopedUse() {
if (m_acquired) {
m_barrier.release();
}
@@ -310,9 +303,8 @@ SimpleQueue::ScopedUse::~ScopedUse()
// private
void
-SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
- bool /*isRecovery*/)
-{
+SimpleQueue::push(boost::shared_ptr<SimpleQueuedMessage> qm,
+ bool /*isRecovery*/) {
m_messages->push(qm);
}
@@ -320,14 +312,14 @@ SimpleQueue::push(boost::shared_ptr<QueuedMessage> qm,
// private
bool
-SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm) {
+SimpleQueue::asyncEnqueue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm) {
assert(qm.get());
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- qm->payload(),
- tb,
- &handleAsyncEnqueueResult,
- &m_resultQueue));
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ qm->payload(),
+ tb,
+ &handleAsyncEnqueueResult,
+ &m_resultQueue));
if (tb) {
tb->incrOpCnt();
m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac);
@@ -340,10 +332,10 @@ SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb,
// private static
void
-SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh) {
+SimpleQueue::handleAsyncEnqueueResult(const AsyncResultHandle* const arh) {
if (arh) {
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
- boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<QueueAsyncContext> qc =
+ boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
if (arh->getErrNo()) {
// TODO: Handle async failure here (other than by simply printing a message)
@@ -357,15 +349,14 @@ SimpleQueue::handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* con
// private
bool
-SimpleQueue::asyncDequeue(/*boost::shared_ptr<qpid::broker::TxnBuffer>*/qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm)
-{
+SimpleQueue::asyncDequeue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm) {
assert(qm.get());
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
- qm->payload(),
- tb,
- &handleAsyncDequeueResult,
- &m_resultQueue));
+ boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(shared_from_this(),
+ qm->payload(),
+ tb,
+ &handleAsyncDequeueResult,
+ &m_resultQueue));
if (tb) {
tb->incrOpCnt();
m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac);
@@ -375,12 +366,12 @@ SimpleQueue::asyncDequeue(/*boost::shared_ptr<qpid::broker::TxnBuffer>*/qpid::br
++m_asyncOpCounter;
return true;
}
+
// private static
void
-SimpleQueue::handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh) {
+SimpleQueue::handleAsyncDequeueResult(const AsyncResultHandle* const arh) {
if (arh) {
- boost::shared_ptr<qpid::broker::QueueAsyncContext> qc =
- boost::dynamic_pointer_cast<qpid::broker::QueueAsyncContext>(arh->getBrokerAsyncContext());
+ boost::shared_ptr<QueueAsyncContext> qc = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
boost::shared_ptr<SimpleQueue> sq = boost::dynamic_pointer_cast<SimpleQueue>(qc->getQueue());
if (arh->getErrNo()) {
// TODO: Handle async failure here (other than by simply printing a message)
@@ -404,7 +395,7 @@ SimpleQueue::destroyCheck(const std::string& opDescr) const {
// private
void
-SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+SimpleQueue::createComplete(const boost::shared_ptr<QueueAsyncContext> qc) {
if (qc.get()) {
assert(qc->getQueue().get() == this);
}
@@ -413,7 +404,7 @@ SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncCont
// private
void
-SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+SimpleQueue::flushComplete(const boost::shared_ptr<QueueAsyncContext> qc) {
if (qc.get()) {
assert(qc->getQueue().get() == this);
}
@@ -422,7 +413,7 @@ SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncConte
// private
void
-SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+SimpleQueue::destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc) {
if (qc.get()) {
assert(qc->getQueue().get() == this);
}
@@ -432,7 +423,7 @@ SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncCon
// private
void
-SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+SimpleQueue::enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc) {
if (qc.get()) {
assert(qc->getQueue().get() == this);
if (qc->getTxnBuffer()) { // transactional enqueue
@@ -444,7 +435,7 @@ SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncCon
// private
void
-SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+SimpleQueue::dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc) {
if (qc.get()) {
assert(qc->getQueue().get() == this);
if (qc->getTxnBuffer()) { // transactional enqueue
@@ -454,4 +445,4 @@ SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncCon
--m_asyncOpCounter;
}
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/qpid/broker/SimpleQueue.h
index 5f64c9b960..c2f21076cd 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
+++ b/cpp/src/qpid/broker/SimpleQueue.h
@@ -21,11 +21,11 @@
* \file SimpleQueue.h
*/
-#ifndef tests_storePerftools_asyncPerf_SimpleQueue_h_
-#define tests_storePerftools_asyncPerf_SimpleQueue_h_
+#ifndef qpid_broker_SimpleQueue_h_
+#define qpid_broker_SimpleQueue_h_
#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter
-#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource
+#include "qpid/broker/AsyncStore.h" // DataSource
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/QueueHandle.h"
#include "qpid/sys/Monitor.h"
@@ -35,53 +35,50 @@
#include <boost/enable_shared_from_this.hpp>
namespace qpid {
-namespace broker {
-class AsyncResultQueue;
-class QueueAsyncContext;
-class TxnBuffer;
-}
+
namespace framing {
class FieldTable;
-}}
+}
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace broker {
-class MessageConsumer;
-class Messages;
-class QueuedMessage;
+class AsyncResultQueue;
+class QueueAsyncContext;
+class SimpleConsumer;
+class SimpleMessages;
+class SimpleQueuedMessage;
class SimpleMessage;
+class SimpleTxnBuffer;
class SimpleQueue : public boost::enable_shared_from_this<SimpleQueue>,
- public qpid::broker::PersistableQueue,
- public qpid::broker::DataSource
+ public PersistableQueue,
+ public DataSource
{
public:
SimpleQueue(const std::string& name,
const qpid::framing::FieldTable& args,
- qpid::broker::AsyncStore* store,
- qpid::broker::AsyncResultQueue& arq);
+ AsyncStore* store,
+ AsyncResultQueue& arq);
virtual ~SimpleQueue();
- const qpid::broker::QueueHandle& getHandle() const;
- qpid::broker::QueueHandle& getHandle();
- qpid::broker::AsyncStore* getStore();
+ const QueueHandle& getHandle() const;
+ QueueHandle& getHandle();
+ AsyncStore* getStore();
void asyncCreate();
- static void handleAsyncCreateResult(const qpid::broker::AsyncResultHandle* const arh);
+ static void handleAsyncCreateResult(const AsyncResultHandle* const arh);
void asyncDestroy(const bool deleteQueue);
- static void handleAsyncDestroyResult(const qpid::broker::AsyncResultHandle* const arh);
+ static void handleAsyncDestroyResult(const AsyncResultHandle* const arh);
// --- Methods in msg handling path from qpid::Queue ---
void deliver(boost::intrusive_ptr<SimpleMessage> msg);
- bool dispatch(MessageConsumer& mc);
- bool enqueue(boost::shared_ptr<QueuedMessage> qm);
- bool enqueue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm);
- bool dequeue(boost::shared_ptr<QueuedMessage> qm);
- bool dequeue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm);
+ bool dispatch(SimpleConsumer& sc);
+ bool enqueue(boost::shared_ptr<SimpleQueuedMessage> qm);
+ bool enqueue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm);
+ bool dequeue(boost::shared_ptr<SimpleQueuedMessage> qm);
+ bool dequeue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm);
void process(boost::intrusive_ptr<SimpleMessage> msg);
void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg);
@@ -94,22 +91,22 @@ public:
// --- Interface qpid::broker::PersistableQueue ---
virtual void flush();
virtual const std::string& getName() const;
- virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst);
+ virtual void setExternalQueueStore(ExternalQueueStore* inst);
// --- Interface qpid::broker::DataStore ---
virtual uint64_t getSize();
virtual void write(char* target);
private:
- static qpid::broker::TxnHandle s_nullTxnHandle; // used for non-txn operations
+ static TxnHandle s_nullTxnHandle; // used for non-txn operations
const std::string m_name;
- qpid::broker::AsyncStore* m_store;
- qpid::broker::AsyncResultQueue& m_resultQueue;
+ AsyncStore* m_store;
+ AsyncResultQueue& m_resultQueue;
qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; // TODO: change this to non-async store counter!
mutable uint64_t m_persistenceId;
std::string m_persistableData;
- qpid::broker::QueueHandle m_queueHandle;
+ QueueHandle m_queueHandle;
bool m_destroyPending;
bool m_destroyed;
@@ -130,29 +127,29 @@ private:
~ScopedUse();
};
UsageBarrier m_barrier;
- std::auto_ptr<Messages> m_messages;
- void push(boost::shared_ptr<QueuedMessage> qm,
+ std::auto_ptr<SimpleMessages> m_messages;
+ void push(boost::shared_ptr<SimpleQueuedMessage> qm,
bool isRecovery = false);
// -- Async ops ---
- bool asyncEnqueue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm);
- static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh);
- bool asyncDequeue(qpid::broker::TxnBuffer* tb,
- boost::shared_ptr<QueuedMessage> qm);
- static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh);
+ bool asyncEnqueue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm);
+ static void handleAsyncEnqueueResult(const AsyncResultHandle* const arh);
+ bool asyncDequeue(SimpleTxnBuffer* tb,
+ boost::shared_ptr<SimpleQueuedMessage> qm);
+ static void handleAsyncDequeueResult(const AsyncResultHandle* const arh);
// --- Async op counter ---
void destroyCheck(const std::string& opDescr) const;
// --- Async op completions (called through handleAsyncResult) ---
- void createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
- void flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
- void destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
- void enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
- void dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc);
+ void createComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+ void flushComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+ void destroyComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+ void enqueueComplete(const boost::shared_ptr<QueueAsyncContext> qc);
+ void dequeueComplete(const boost::shared_ptr<QueueAsyncContext> qc);
};
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
-#endif // tests_storePerftools_asyncPerf_SimpleQueue_h_
+#endif // qpid_broker_SimpleQueue_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/qpid/broker/SimpleQueuedMessage.cpp
index 0d16248c7f..35ac799ecc 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
+++ b/cpp/src/qpid/broker/SimpleQueuedMessage.cpp
@@ -18,27 +18,24 @@
*/
/**
- * \file QueuedMessage.cpp
+ * \file SimpleQueuedMessage.cpp
*/
-#include "QueuedMessage.h"
+#include "SimpleQueuedMessage.h"
#include "SimpleMessage.h"
#include "SimpleQueue.h"
-#include "qpid/asyncStore/AsyncStoreImpl.h"
+namespace qpid {
+namespace broker {
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
-QueuedMessage::QueuedMessage() :
+SimpleQueuedMessage::SimpleQueuedMessage() :
m_queue(0)
{}
-QueuedMessage::QueuedMessage(SimpleQueue* q,
- boost::intrusive_ptr<SimpleMessage> msg) :
- boost::enable_shared_from_this<QueuedMessage>(),
+SimpleQueuedMessage::SimpleQueuedMessage(SimpleQueue* q,
+ boost::intrusive_ptr<SimpleMessage> msg) :
+ boost::enable_shared_from_this<SimpleQueuedMessage>(),
m_queue(q),
m_msg(msg)
{
@@ -47,63 +44,55 @@ QueuedMessage::QueuedMessage(SimpleQueue* q,
}
}
-QueuedMessage::QueuedMessage(const QueuedMessage& qm) :
- boost::enable_shared_from_this<QueuedMessage>(),
+SimpleQueuedMessage::SimpleQueuedMessage(const SimpleQueuedMessage& qm) :
+ boost::enable_shared_from_this<SimpleQueuedMessage>(),
m_queue(qm.m_queue),
m_msg(qm.m_msg),
m_enqHandle(qm.m_enqHandle)
{}
-QueuedMessage::QueuedMessage(QueuedMessage* const qm) :
- boost::enable_shared_from_this<QueuedMessage>(),
+SimpleQueuedMessage::SimpleQueuedMessage(SimpleQueuedMessage* const qm) :
+ boost::enable_shared_from_this<SimpleQueuedMessage>(),
m_queue(qm->m_queue),
m_msg(qm->m_msg),
m_enqHandle(qm->m_enqHandle)
{}
-QueuedMessage::~QueuedMessage()
-{}
+SimpleQueuedMessage::~SimpleQueuedMessage() {}
SimpleQueue*
-QueuedMessage::getQueue() const
-{
+SimpleQueuedMessage::getQueue() const {
return m_queue;
}
boost::intrusive_ptr<SimpleMessage>
-QueuedMessage::payload() const
-{
+SimpleQueuedMessage::payload() const {
return m_msg;
}
-const qpid::broker::EnqueueHandle&
-QueuedMessage::enqHandle() const
-{
+const EnqueueHandle&
+SimpleQueuedMessage::enqHandle() const {
return m_enqHandle;
}
-qpid::broker::EnqueueHandle&
-QueuedMessage::enqHandle()
-{
+EnqueueHandle&
+SimpleQueuedMessage::enqHandle() {
return m_enqHandle;
}
void
-QueuedMessage::prepareEnqueue(qpid::broker::TxnBuffer* tb)
-{
+SimpleQueuedMessage::prepareEnqueue(SimpleTxnBuffer* tb) {
m_queue->enqueue(tb, shared_from_this());
}
void
-QueuedMessage::commitEnqueue()
-{
+SimpleQueuedMessage::commitEnqueue() {
m_queue->process(m_msg);
}
void
-QueuedMessage::abortEnqueue()
-{
+SimpleQueuedMessage::abortEnqueue() {
m_queue->enqueueAborted(m_msg);
}
-}}} // namespace tests::storePerfTools
+}} // namespace qpid::broker
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/qpid/broker/SimpleQueuedMessage.h
index 630fe1aedc..1172eb73f3 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
+++ b/cpp/src/qpid/broker/SimpleQueuedMessage.h
@@ -18,14 +18,14 @@
*/
/**
- * \file QueuedMessage.h
+ * \file SimpleQueuedMessage.h
*/
-#ifndef tests_storePerftools_asyncPerf_QueuedMessage_h_
-#define tests_storePerftools_asyncPerf_QueuedMessage_h_
+#ifndef qpid_broker_SimpleQueuedMessage_h_
+#define qpid_broker_SimpleQueuedMessage_h_
-#include "qpid/broker/AsyncStore.h"
-#include "qpid/broker/EnqueueHandle.h"
+#include "AsyncStore.h"
+#include "EnqueueHandle.h"
#include <boost/enable_shared_from_this.hpp>
#include <boost/intrusive_ptr.hpp>
@@ -33,33 +33,25 @@
namespace qpid {
namespace broker {
-class TxnHandle;
-
-}}
-
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-
class SimpleMessage;
class SimpleQueue;
-class QueuedMessage : public boost::enable_shared_from_this<QueuedMessage>
+class SimpleQueuedMessage : public boost::enable_shared_from_this<SimpleQueuedMessage>
{
public:
- QueuedMessage();
- QueuedMessage(SimpleQueue* q,
+ SimpleQueuedMessage();
+ SimpleQueuedMessage(SimpleQueue* q,
boost::intrusive_ptr<SimpleMessage> msg);
- QueuedMessage(const QueuedMessage& qm);
- QueuedMessage(QueuedMessage* const qm);
- virtual ~QueuedMessage();
+ SimpleQueuedMessage(const SimpleQueuedMessage& qm);
+ SimpleQueuedMessage(SimpleQueuedMessage* const qm);
+ virtual ~SimpleQueuedMessage();
SimpleQueue* getQueue() const;
boost::intrusive_ptr<SimpleMessage> payload() const;
- const qpid::broker::EnqueueHandle& enqHandle() const;
- qpid::broker::EnqueueHandle& enqHandle();
+ const EnqueueHandle& enqHandle() const;
+ EnqueueHandle& enqHandle();
// --- Transaction handling ---
- void prepareEnqueue(qpid::broker::TxnBuffer* tb);
+ void prepareEnqueue(qpid::broker::SimpleTxnBuffer* tb);
void commitEnqueue();
void abortEnqueue();
@@ -69,6 +61,6 @@ private:
qpid::broker::EnqueueHandle m_enqHandle;
};
-}}} // namespace tests::storePerfTools
+}} // namespace qpid::broker
-#endif // tests_storePerftools_asyncPerf_QueuedMessage_h_
+#endif // qpid_broker_SimpleQueuedMessage_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/qpid/broker/SimpleTxnAccept.cpp
index 375cd568d2..343bbb54c7 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
+++ b/cpp/src/qpid/broker/SimpleTxnAccept.cpp
@@ -18,31 +18,30 @@
*/
/**
- * \file TxnAccept.cpp
+ * \file SimpleTxnAccept.cpp
*/
-#include "TxnAccept.h"
+#include "SimpleTxnAccept.h"
-#include "DeliveryRecord.h"
+#include "SimpleDeliveryRecord.h"
#include "qpid/log/Statement.h"
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
-TxnAccept::TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops) :
+SimpleTxnAccept::SimpleTxnAccept(std::deque<boost::shared_ptr<SimpleDeliveryRecord> >& ops) :
m_ops(ops)
{}
-TxnAccept::~TxnAccept() {}
+SimpleTxnAccept::~SimpleTxnAccept() {}
// --- Interface TxnOp ---
bool
-TxnAccept::prepare(qpid::broker::TxnBuffer* tb) throw() {
+SimpleTxnAccept::prepare(SimpleTxnBuffer* tb) throw() {
try {
- for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ for (std::deque<boost::shared_ptr<SimpleDeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->dequeue(tb);
}
return true;
@@ -55,9 +54,9 @@ TxnAccept::prepare(qpid::broker::TxnBuffer* tb) throw() {
}
void
-TxnAccept::commit() throw() {
+SimpleTxnAccept::commit() throw() {
try {
- for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) {
+ for (std::deque<boost::shared_ptr<SimpleDeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) {
(*i)->committed();
(*i)->setEnded();
}
@@ -69,6 +68,6 @@ TxnAccept::commit() throw() {
}
void
-TxnAccept::rollback() throw() {}
+SimpleTxnAccept::rollback() throw() {}
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/qpid/broker/SimpleTxnAccept.h
index 5d84289965..eb6963bc88 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
+++ b/cpp/src/qpid/broker/SimpleTxnAccept.h
@@ -18,36 +18,35 @@
*/
/**
- * \file TxnAccept.h
+ * \file SimpleTxnAccept.h
*/
-#ifndef tests_storePerftools_asyncPerf_TxnAccept_h_
-#define tests_storePerftools_asyncPerf_TxnAccept_h_
+#ifndef tests_storePerftools_asyncPerf_SimpleTxnAccept_h_
+#define tests_storePerftools_asyncPerf_SimpleTxnAccept_h_
-#include "qpid/broker/TxnOp.h"
+#include "SimpleTxnOp.h"
#include "boost/shared_ptr.hpp"
#include <deque>
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
-class DeliveryRecord;
+class SimpleDeliveryRecord;
-class TxnAccept: public qpid::broker::TxnOp {
+class SimpleTxnAccept: public SimpleTxnOp {
public:
- TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops);
- virtual ~TxnAccept();
+ SimpleTxnAccept(std::deque<boost::shared_ptr<SimpleDeliveryRecord> >& ops);
+ virtual ~SimpleTxnAccept();
// --- Interface TxnOp ---
- bool prepare(qpid::broker::TxnBuffer* tb) throw();
+ bool prepare(SimpleTxnBuffer* tb) throw();
void commit() throw();
void rollback() throw();
private:
- std::deque<boost::shared_ptr<DeliveryRecord> > m_ops;
+ std::deque<boost::shared_ptr<SimpleDeliveryRecord> > m_ops;
};
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
-#endif // tests_storePerftools_asyncPerf_TxnAccept_h_
+#endif // tests_storePerftools_asyncPerf_SimpleTxnAccept_h_
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp
index 4d6e7b7918..d72a785c2a 100644
--- a/cpp/src/qpid/broker/TxnBuffer.cpp
+++ b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp
@@ -18,14 +18,14 @@
*/
/**
- * \file TxnBuffer.cpp
+ * \file SimpleTxnBuffer.cpp
*/
-#include "TxnBuffer.h"
+#include "SimpleTxnBuffer.h"
#include "AsyncResultHandle.h"
+#include "SimpleTxnOp.h"
#include "TxnAsyncContext.h"
-#include "TxnOp.h"
#include "qpid/log/Statement.h"
@@ -34,9 +34,9 @@
namespace qpid {
namespace broker {
-qpid::sys::Mutex TxnBuffer::s_uuidMutex;
+qpid::sys::Mutex SimpleTxnBuffer::s_uuidMutex;
-TxnBuffer::TxnBuffer(AsyncResultQueue& arq) :
+SimpleTxnBuffer::SimpleTxnBuffer(AsyncResultQueue& arq) :
m_store(0),
m_resultQueue(arq),
m_tpcFlag(false),
@@ -47,7 +47,7 @@ TxnBuffer::TxnBuffer(AsyncResultQueue& arq) :
createLocalXid();
}
-TxnBuffer::TxnBuffer(AsyncResultQueue& arq, std::string& xid) :
+SimpleTxnBuffer::SimpleTxnBuffer(AsyncResultQueue& arq, std::string& xid) :
m_store(0),
m_resultQueue(arq),
m_xid(xid),
@@ -61,31 +61,31 @@ TxnBuffer::TxnBuffer(AsyncResultQueue& arq, std::string& xid) :
}
}
-TxnBuffer::~TxnBuffer() {}
+SimpleTxnBuffer::~SimpleTxnBuffer() {}
TxnHandle&
-TxnBuffer::getTxnHandle() {
+SimpleTxnBuffer::getTxnHandle() {
return m_txnHandle;
}
const std::string&
-TxnBuffer::getXid() const {
+SimpleTxnBuffer::getXid() const {
return m_xid;
}
bool
-TxnBuffer::is2pc() const {
+SimpleTxnBuffer::is2pc() const {
return m_tpcFlag;
}
void
-TxnBuffer::incrOpCnt() {
+SimpleTxnBuffer::incrOpCnt() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_submitOpCntMutex);
++m_submitOpCnt;
}
void
-TxnBuffer::decrOpCnt() {
+SimpleTxnBuffer::decrOpCnt() {
const uint32_t numOps = getNumOps();
qpid::sys::ScopedLock<qpid::sys::Mutex> l2(m_completeOpCntMutex);
qpid::sys::ScopedLock<qpid::sys::Mutex> l3(m_submitOpCntMutex);
@@ -99,15 +99,15 @@ TxnBuffer::decrOpCnt() {
}
void
-TxnBuffer::enlist(boost::shared_ptr<TxnOp> op) {
+SimpleTxnBuffer::enlist(boost::shared_ptr<SimpleTxnOp> op) {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
m_ops.push_back(op);
}
bool
-TxnBuffer::prepare() {
+SimpleTxnBuffer::prepare() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
- for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ for(std::vector<boost::shared_ptr<SimpleTxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
if (!(*i)->prepare(this)) {
return false;
}
@@ -116,25 +116,25 @@ TxnBuffer::prepare() {
}
void
-TxnBuffer::commit() {
+SimpleTxnBuffer::commit() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
- for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ for(std::vector<boost::shared_ptr<SimpleTxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->commit();
}
m_ops.clear();
}
void
-TxnBuffer::rollback() {
+SimpleTxnBuffer::rollback() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
- for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ for(std::vector<boost::shared_ptr<SimpleTxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->rollback();
}
m_ops.clear();
}
bool
-TxnBuffer::commitLocal(AsyncTransactionalStore* const store) {
+SimpleTxnBuffer::commitLocal(AsyncTransactionalStore* const store) {
try {
m_store = store;
asyncLocalCommit();
@@ -147,7 +147,7 @@ TxnBuffer::commitLocal(AsyncTransactionalStore* const store) {
}
void
-TxnBuffer::asyncLocalCommit() {
+SimpleTxnBuffer::asyncLocalCommit() {
switch(m_state) {
case NONE:
m_state = PREPARE;
@@ -180,7 +180,7 @@ TxnBuffer::asyncLocalCommit() {
//static
void
-TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) {
+SimpleTxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) {
if (arh) {
boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
if (arh->getErrNo()) {
@@ -194,7 +194,7 @@ TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) {
}
void
-TxnBuffer::asyncLocalAbort() {
+SimpleTxnBuffer::asyncLocalAbort() {
assert(m_store != 0);
switch (m_state) {
case NONE:
@@ -218,7 +218,7 @@ TxnBuffer::asyncLocalAbort() {
//static
void
-TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) {
+SimpleTxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) {
if (arh) {
boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
if (arh->getErrNo()) {
@@ -231,14 +231,14 @@ TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) {
// private
uint32_t
-TxnBuffer::getNumOps() const {
+SimpleTxnBuffer::getNumOps() const {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
return m_ops.size();
}
// private
void
-TxnBuffer::createLocalXid()
+SimpleTxnBuffer::createLocalXid()
{
uuid_t uuid;
{
diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/SimpleTxnBuffer.h
index 02569f6545..b2164cfeed 100644
--- a/cpp/src/qpid/broker/TxnBuffer.h
+++ b/cpp/src/qpid/broker/SimpleTxnBuffer.h
@@ -18,11 +18,11 @@
*/
/**
- * \file TxnBuffer.h
+ * \file SimpleTxnBuffer.h
*/
-#ifndef qpid_broker_TxnBuffer_h_
-#define qpid_broker_TxnBuffer_h_
+#ifndef qpid_broker_SimpleTxnBuffer_h_
+#define qpid_broker_SimpleTxnBuffer_h_
#include "TxnHandle.h"
@@ -37,20 +37,20 @@ namespace broker {
class AsyncResultHandle;
class AsyncResultQueue;
class AsyncTransactionalStore;
-class TxnOp;
+class SimpleTxnOp;
-class TxnBuffer {
+class SimpleTxnBuffer {
public:
- TxnBuffer(AsyncResultQueue& arq);
- TxnBuffer(AsyncResultQueue& arq, std::string& xid);
- virtual ~TxnBuffer();
+ SimpleTxnBuffer(AsyncResultQueue& arq);
+ SimpleTxnBuffer(AsyncResultQueue& arq, std::string& xid);
+ virtual ~SimpleTxnBuffer();
TxnHandle& getTxnHandle();
const std::string& getXid() const;
bool is2pc() const;
void incrOpCnt();
void decrOpCnt();
- void enlist(boost::shared_ptr<TxnOp> op);
+ void enlist(boost::shared_ptr<SimpleTxnOp> op);
bool prepare();
void commit();
void rollback();
@@ -68,7 +68,7 @@ private:
mutable qpid::sys::Mutex m_completeOpCntMutex;
static qpid::sys::Mutex s_uuidMutex;
- std::vector<boost::shared_ptr<TxnOp> > m_ops;
+ std::vector<boost::shared_ptr<SimpleTxnOp> > m_ops;
TxnHandle m_txnHandle;
AsyncTransactionalStore* m_store;
AsyncResultQueue& m_resultQueue;
@@ -86,4 +86,4 @@ private:
}} // namespace qpid::broker
-#endif // qpid_broker_TxnBuffer_h_
+#endif // qpid_broker_SimpleTxnBuffer_h_
diff --git a/cpp/src/qpid/broker/TxnOp.h b/cpp/src/qpid/broker/SimpleTxnOp.h
index bcff87551c..2cec2da8f0 100644
--- a/cpp/src/qpid/broker/TxnOp.h
+++ b/cpp/src/qpid/broker/SimpleTxnOp.h
@@ -18,27 +18,27 @@
*/
/**
- * \file TxnOp.h
+ * \file SimpleTxnOp.h
*/
-#ifndef qpid_broker_TxnOp_h_
-#define qpid_broker_TxnOp_h_
+#ifndef qpid_broker_SimpleTxnOp_h_
+#define qpid_broker_SimpleTxnOp_h_
#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
-class TxnBuffer;
+class SimpleTxnBuffer;
-class TxnOp{
+class SimpleTxnOp{
public:
- virtual ~TxnOp() {}
- virtual bool prepare(qpid::broker::TxnBuffer*) throw() = 0;
+ virtual ~SimpleTxnOp() {}
+ virtual bool prepare(SimpleTxnBuffer*) throw() = 0;
virtual void commit() throw() = 0;
virtual void rollback() throw() = 0;
};
}} // namespace qpid::broker
-#endif // qpid_broker_TxnOp_h_
+#endif // qpid_broker_SimpleTxnOp_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/qpid/broker/SimpleTxnPublish.cpp
index cc36a38be7..6ad6a108ea 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
+++ b/cpp/src/qpid/broker/SimpleTxnPublish.cpp
@@ -18,30 +18,29 @@
*/
/**
- * \file TxnPublish.cpp
+ * \file SimpleTxnPublish.cpp
*/
-#include "TxnPublish.h"
+#include "SimpleTxnPublish.h"
-#include "QueuedMessage.h"
#include "SimpleMessage.h"
#include "SimpleQueue.h"
+#include "SimpleQueuedMessage.h"
#include "qpid/log/Statement.h"
#include <boost/make_shared.hpp>
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
+namespace qpid {
+namespace broker {
-TxnPublish::TxnPublish(boost::intrusive_ptr<SimpleMessage> msg) :
+SimpleTxnPublish::SimpleTxnPublish(boost::intrusive_ptr<SimpleMessage> msg) :
m_msg(msg)
{}
-TxnPublish::~TxnPublish() {}
+SimpleTxnPublish::~SimpleTxnPublish() {}
bool
-TxnPublish::prepare(qpid::broker::TxnBuffer* tb) throw() {
+SimpleTxnPublish::prepare(SimpleTxnBuffer* tb) throw() {
try {
while (!m_queues.empty()) {
m_queues.front()->prepareEnqueue(tb);
@@ -58,9 +57,9 @@ TxnPublish::prepare(qpid::broker::TxnBuffer* tb) throw() {
}
void
-TxnPublish::commit() throw() {
+SimpleTxnPublish::commit() throw() {
try {
- for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
+ for (std::list<boost::shared_ptr<SimpleQueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
(*i)->commitEnqueue();
}
} catch (const std::exception& e) {
@@ -71,9 +70,9 @@ TxnPublish::commit() throw() {
}
void
-TxnPublish::rollback() throw() {
+SimpleTxnPublish::rollback() throw() {
try {
- for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
+ for (std::list<boost::shared_ptr<SimpleQueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
(*i)->abortEnqueue();
}
} catch (const std::exception& e) {
@@ -84,19 +83,19 @@ TxnPublish::rollback() throw() {
}
uint64_t
-TxnPublish::contentSize() {
+SimpleTxnPublish::contentSize() {
return m_msg->contentSize();
}
void
-TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) {
- m_queues.push_back(boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg)));
+SimpleTxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) {
+ m_queues.push_back(boost::shared_ptr<SimpleQueuedMessage>(new SimpleQueuedMessage(queue.get(), m_msg)));
m_delivered = true;
}
SimpleMessage&
-TxnPublish::getMessage() {
+SimpleTxnPublish::getMessage() {
return *m_msg;
}
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h b/cpp/src/qpid/broker/SimpleTxnPublish.h
index eae9ef9c4c..0aaf8e4ba0 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
+++ b/cpp/src/qpid/broker/SimpleTxnPublish.h
@@ -18,37 +18,36 @@
*/
/**
- * \file TxnPublish.h
+ * \file SimpleTxnPublish.h
*/
-#ifndef tests_storePerftools_asyncPerf_TxnPublish_h_
-#define tests_storePerftools_asyncPerf_TxnPublish_h_
+#ifndef qpid_broker_SimpleTxnPublish_h_
+#define qpid_broker_SimpleTxnPublish_h_
-#include "Deliverable.h"
-
-#include "qpid/broker/TxnOp.h"
+#include "SimpleDeliverable.h"
+#include "SimpleTxnOp.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <list>
-namespace tests {
-namespace storePerftools {
-namespace asyncPerf {
-class QueuedMessage;
+namespace qpid {
+namespace broker {
+
+class SimpleQueuedMessage;
class SimpleMessage;
class SimpleQueue;
-class TxnPublish : public qpid::broker::TxnOp,
- public Deliverable
+class SimpleTxnPublish : public SimpleTxnOp,
+ public SimpleDeliverable
{
public:
- TxnPublish(boost::intrusive_ptr<SimpleMessage> msg);
- virtual ~TxnPublish();
+ SimpleTxnPublish(boost::intrusive_ptr<SimpleMessage> msg);
+ virtual ~SimpleTxnPublish();
// --- Interface TxOp ---
- bool prepare(qpid::broker::TxnBuffer* tb) throw();
+ bool prepare(SimpleTxnBuffer* tb) throw();
void commit() throw();
void rollback() throw();
@@ -59,10 +58,10 @@ public:
private:
boost::intrusive_ptr<SimpleMessage> m_msg;
- std::list<boost::shared_ptr<QueuedMessage> > m_queues;
- std::list<boost::shared_ptr<QueuedMessage> > m_prepared;
+ std::list<boost::shared_ptr<SimpleQueuedMessage> > m_queues;
+ std::list<boost::shared_ptr<SimpleQueuedMessage> > m_prepared;
};
-}}} // namespace tests::storePerftools::asyncPerf
+}} // namespace qpid::broker
-#endif // tests_storePerftools_asyncPerf_TxnPublish_h_
+#endif // qpid_broker_SimpleTxnPublish_h_
diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp
index 63e2de2b41..527cb4741f 100644
--- a/cpp/src/qpid/broker/TxnAsyncContext.cpp
+++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp
@@ -26,7 +26,7 @@
namespace qpid {
namespace broker {
-TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb,
+TxnAsyncContext::TxnAsyncContext(SimpleTxnBuffer* const tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq):
m_tb(tb),
@@ -37,7 +37,7 @@ TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb,
TxnAsyncContext::~TxnAsyncContext()
{}
-TxnBuffer*
+SimpleTxnBuffer*
TxnAsyncContext::getTxnBuffer() const
{
return m_tb;
diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h
index 9c617238e8..04f6ef76f5 100644
--- a/cpp/src/qpid/broker/TxnAsyncContext.h
+++ b/cpp/src/qpid/broker/TxnAsyncContext.h
@@ -29,38 +29,34 @@
#include "qpid/asyncStore/AsyncOperation.h"
namespace qpid {
-//namespace asyncStore {
-//class AsyncOperation;
-//}
namespace broker {
class AsyncResultHandle;
class AsyncResultQueue;
-//class TxnHandle;
typedef void (*AsyncResultCallback)(const AsyncResultHandle* const);
class TxnAsyncContext: public BrokerAsyncContext {
public:
- TxnAsyncContext(TxnBuffer* const tb,
+ TxnAsyncContext(SimpleTxnBuffer* const tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq);
virtual ~TxnAsyncContext();
- TxnBuffer* getTxnBuffer() const;
+ SimpleTxnBuffer* getTxnBuffer() const;
// --- Interface BrokerAsyncContext ---
AsyncResultQueue* getAsyncResultQueue() const;
void invokeCallback(const AsyncResultHandle* const) const;
private:
- TxnBuffer* const m_tb;
+ SimpleTxnBuffer* const m_tb;
AsyncResultCallback m_rcb;
AsyncResultQueue* const m_arq;
};
class TpcTxnAsyncContext : public TxnAsyncContext {
public:
- TpcTxnAsyncContext(TxnBuffer* const tb,
+ TpcTxnAsyncContext(SimpleTxnBuffer* const tb,
AsyncResultCallback rcb,
AsyncResultQueue* const arq) :
TxnAsyncContext(tb, rcb, arq)
diff --git a/cpp/src/tests/asyncstore.cmake b/cpp/src/tests/asyncstore.cmake
index 94631bb8ea..3dcd81c3d7 100644
--- a/cpp/src/tests/asyncstore.cmake
+++ b/cpp/src/tests/asyncstore.cmake
@@ -51,20 +51,20 @@ endif (UNIX)
# Async store perf test (asyncPerf)
set (asyncStorePerf_SOURCES
- storePerftools/asyncPerf/Deliverable.cpp
- storePerftools/asyncPerf/DeliveryRecord.cpp
- storePerftools/asyncPerf/MessageAsyncContext.cpp
storePerftools/asyncPerf/MessageConsumer.cpp
- storePerftools/asyncPerf/MessageDeque.cpp
storePerftools/asyncPerf/MessageProducer.cpp
storePerftools/asyncPerf/PerfTest.cpp
- storePerftools/asyncPerf/QueuedMessage.cpp
- storePerftools/asyncPerf/SimpleMessage.cpp
- storePerftools/asyncPerf/SimpleQueue.cpp
+# storePerftools/asyncPerf/SimpleDeliverable.cpp
+# storePerftools/asyncPerf/SimpleDeliveryRecord.cpp
+# storePerftools/asyncPerf/SimpleMessage.cpp
+# storePerftools/asyncPerf/SimpleMessageAsyncContext.cpp
+# storePerftools/asyncPerf/SimpleMessageDeque.cpp
+# storePerftools/asyncPerf/SimpleQueue.cpp
+# storePerftools/asyncPerf/SimpleQueuedMessage.cpp
+# storePerftools/asyncPerf/SimpleTxnAccept.cpp
+# storePerftools/asyncPerf/SimpleTxnPublish.cpp
storePerftools/asyncPerf/TestOptions.cpp
storePerftools/asyncPerf/TestResult.cpp
- storePerftools/asyncPerf/TxnAccept.cpp
- storePerftools/asyncPerf/TxnPublish.cpp
storePerftools/common/Parameters.cpp
storePerftools/common/PerftoolError.cpp
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 6aa477c470..6477696bd6 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -23,13 +23,12 @@
#include "MessageConsumer.h"
-#include "DeliveryRecord.h"
-#include "SimpleQueue.h"
#include "TestOptions.h"
-#include "TxnAccept.h"
-#include "qpid/asyncStore/AsyncStoreImpl.h"
-#include "qpid/broker/TxnBuffer.h"
+#include "qpid/broker/SimpleDeliveryRecord.h"
+#include "qpid/broker/SimpleQueue.h"
+#include "qpid/broker/SimpleTxnAccept.h"
+#include "qpid/broker/SimpleTxnBuffer.h"
#include <stdint.h> // uint32_t
@@ -38,9 +37,9 @@ namespace storePerftools {
namespace asyncPerf {
MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq,
- boost::shared_ptr<SimpleQueue> queue) :
+ boost::shared_ptr<qpid::broker::SimpleQueue> queue) :
m_perfTestParams(perfTestParams),
m_store(store),
m_resultQueue(arq),
@@ -50,7 +49,7 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
MessageConsumer::~MessageConsumer() {}
void
-MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) {
+MessageConsumer::record(boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> dr) {
m_unacked.push_back(dr);
}
@@ -61,9 +60,9 @@ void*
MessageConsumer::runConsumers() {
const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
uint16_t opsInTxnCnt = 0U;
- qpid::broker::TxnBuffer* tb = 0;
+ qpid::broker::SimpleTxnBuffer* tb = 0;
if (useTxns) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue);
}
uint32_t msgsPerConsumer = m_perfTestParams.m_numEnqThreadsPerQueue * m_perfTestParams.m_numMsgs /
@@ -74,19 +73,19 @@ MessageConsumer::runConsumers() {
++numMsgs;
if (useTxns) {
// --- Transactional dequeue ---
- boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
+ boost::shared_ptr<qpid::broker::SimpleTxnAccept> ta(new qpid::broker::SimpleTxnAccept(m_unacked));
m_unacked.clear();
tb->enlist(ta);
if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) {
tb->commitLocal(m_store);
if (numMsgs < m_perfTestParams.m_numMsgs) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue);
}
opsInTxnCnt = 0U;
}
} else {
// --- Non-transactional dequeue ---
- for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) {
+ for (std::deque<boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) {
(*i)->accept();
}
m_unacked.clear();
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
index b110520889..d5a881f7e0 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
@@ -24,44 +24,44 @@
#ifndef tests_storePerftools_asyncPerf_MessageConsumer_h_
#define tests_storePerftools_asyncPerf_MessageConsumer_h_
+#include "qpid/broker/SimpleConsumer.h"
+
#include "boost/shared_ptr.hpp"
#include <deque>
namespace qpid {
-namespace asyncStore {
-class AsyncStoreImpl;
-}
namespace broker {
class AsyncResultQueue;
+class AsyncStore;
+class SimpleDeliveryRecord;
+class SimpleQueue;
}}
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class DeliveryRecord;
-class SimpleQueue;
class TestOptions;
-class MessageConsumer
+class MessageConsumer: public qpid::broker::SimpleConsumer
{
public:
MessageConsumer(const TestOptions& perfTestParams,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq,
- boost::shared_ptr<SimpleQueue> queue);
+ boost::shared_ptr<qpid::broker::SimpleQueue> queue);
virtual ~MessageConsumer();
- void record(boost::shared_ptr<DeliveryRecord> dr);
+ void record(boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> dr);
void commitComplete();
void* runConsumers();
static void* startConsumers(void* ptr);
private:
const TestOptions& m_perfTestParams;
- qpid::asyncStore::AsyncStoreImpl* m_store;
+ qpid::broker::AsyncStore* m_store;
qpid::broker::AsyncResultQueue& m_resultQueue;
- boost::shared_ptr<SimpleQueue> m_queue;
- std::deque<boost::shared_ptr<DeliveryRecord> > m_unacked;
+ boost::shared_ptr<qpid::broker::SimpleQueue> m_queue;
+ std::deque<boost::shared_ptr<qpid::broker::SimpleDeliveryRecord> > m_unacked;
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
index 974f3f3981..f88d305a38 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
@@ -23,13 +23,12 @@
#include "MessageProducer.h"
-#include "SimpleMessage.h"
-#include "SimpleQueue.h"
#include "TestOptions.h"
-#include "TxnPublish.h"
-#include "qpid/asyncStore/AsyncStoreImpl.h"
-#include "qpid/broker/TxnBuffer.h"
+#include "qpid/broker/SimpleMessage.h"
+#include "qpid/broker/SimpleQueue.h"
+#include "qpid/broker/SimpleTxnBuffer.h"
+#include "qpid/broker/SimpleTxnPublish.h"
#include <stdint.h> // uint32_t
@@ -39,9 +38,9 @@ namespace asyncPerf {
MessageProducer::MessageProducer(const TestOptions& perfTestParams,
const char* msgData,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq,
- boost::shared_ptr<SimpleQueue> queue) :
+ boost::shared_ptr<qpid::broker::SimpleQueue> queue) :
m_perfTestParams(perfTestParams),
m_msgData(msgData),
m_store(store),
@@ -55,14 +54,14 @@ void*
MessageProducer::runProducers() {
const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U;
uint16_t recsInTxnCnt = 0U;
- qpid::broker::TxnBuffer* tb = 0;
+ qpid::broker::SimpleTxnBuffer* tb = 0;
if (useTxns) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue);
}
for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) {
- boost::intrusive_ptr<SimpleMessage> msg(new SimpleMessage(m_msgData, m_perfTestParams.m_msgSize, m_store));
+ boost::intrusive_ptr<qpid::broker::SimpleMessage> msg(new qpid::broker::SimpleMessage(m_msgData, m_perfTestParams.m_msgSize, m_store));
if (useTxns) {
- boost::shared_ptr<TxnPublish> op(new TxnPublish(msg));
+ boost::shared_ptr<qpid::broker::SimpleTxnPublish> op(new qpid::broker::SimpleTxnPublish(msg));
op->deliverTo(m_queue);
tb->enlist(op);
if (++recsInTxnCnt >= m_perfTestParams.m_enqTxnBlockSize) {
@@ -72,7 +71,7 @@ MessageProducer::runProducers() {
// transaction until the current commit cycle completes. So use another instance. This
// instance should auto-delete when the async commit cycle completes.
if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) {
- tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ tb = new qpid::broker::SimpleTxnBuffer(m_resultQueue);
}
recsInTxnCnt = 0U;
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
index 127408e3db..6f98d03503 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
@@ -27,19 +27,17 @@
#include "boost/shared_ptr.hpp"
namespace qpid {
-namespace asyncStore {
-class AsyncStoreImpl;
-}
namespace broker {
class AsyncResultQueue;
-class TxnBuffer;
+class AsyncStore;
+class SimpleQueue;
+class SimpleTxnBuffer;
}}
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimpleQueue;
class TestOptions;
class MessageProducer
@@ -47,18 +45,18 @@ class MessageProducer
public:
MessageProducer(const TestOptions& perfTestParams,
const char* msgData,
- qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncStore* store,
qpid::broker::AsyncResultQueue& arq,
- boost::shared_ptr<SimpleQueue> queue);
+ boost::shared_ptr<qpid::broker::SimpleQueue> queue);
virtual ~MessageProducer();
void* runProducers();
static void* startProducers(void* ptr);
private:
const TestOptions& m_perfTestParams;
const char* m_msgData;
- qpid::asyncStore::AsyncStoreImpl* m_store;
+ qpid::broker::AsyncStore* m_store;
qpid::broker::AsyncResultQueue& m_resultQueue;
- boost::shared_ptr<SimpleQueue> m_queue;
+ boost::shared_ptr<qpid::broker::SimpleQueue> m_queue;
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
index 6377cc0d85..c05eb0487d 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
@@ -25,7 +25,6 @@
#include "MessageConsumer.h"
#include "MessageProducer.h"
-#include "SimpleQueue.h"
#include "tests/storePerftools/version.h"
#include "tests/storePerftools/common/ScopedTimer.h"
@@ -34,6 +33,7 @@
#include "qpid/Modules.h" // Use with loading store as module
#include "qpid/asyncStore/AsyncStoreImpl.h"
#include "qpid/asyncStore/AsyncStoreOptions.h"
+#include "qpid/broker/SimpleQueue.h"
#include <iomanip>
@@ -55,8 +55,7 @@ PerfTest::PerfTest(const TestOptions& to,
std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize);
}
-PerfTest::~PerfTest()
-{
+PerfTest::~PerfTest() {
m_poller->shutdown();
m_pollingThread.join();
@@ -68,8 +67,7 @@ PerfTest::~PerfTest()
}
void
-PerfTest::run()
-{
+PerfTest::run() {
if (m_testOpts.m_durable) {
prepareStore();
}
@@ -113,8 +111,7 @@ PerfTest::run()
}
void
-PerfTest::toStream(std::ostream& os) const
-{
+PerfTest::toStream(std::ostream& os) const {
m_testOpts.printVals(os);
os << std::endl;
m_storeOpts.printVals(os);
@@ -124,16 +121,15 @@ PerfTest::toStream(std::ostream& os) const
// private
void
-PerfTest::prepareStore()
-{
- m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts);
- m_store->initialize();
+PerfTest::prepareStore() {
+ qpid::asyncStore::AsyncStoreImpl* s = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts);
+ s->initialize();
+ m_store = s;
}
// private
void
-PerfTest::destroyStore()
-{
+PerfTest::destroyStore() {
if (m_store) {
delete m_store;
}
@@ -141,12 +137,11 @@ PerfTest::destroyStore()
// private
void
-PerfTest::prepareQueues()
-{
+PerfTest::prepareQueues() {
for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) {
std::ostringstream qname;
qname << "queue_" << std::setw(4) << std::setfill('0') << i;
- boost::shared_ptr<SimpleQueue> mpq(new SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue));
+ boost::shared_ptr<qpid::broker::SimpleQueue> mpq(new qpid::broker::SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue));
mpq->asyncCreate();
m_queueList.push_back(mpq);
}
@@ -154,8 +149,7 @@ PerfTest::prepareQueues()
// private
void
-PerfTest::destroyQueues()
-{
+PerfTest::destroyQueues() {
while (m_queueList.size() > 0) {
m_queueList.front()->asyncDestroy(m_testOpts.m_destroyQueuesOnCompletion);
m_queueList.pop_front();
@@ -163,8 +157,7 @@ PerfTest::destroyQueues()
}
int
-runPerfTest(int argc, char** argv)
-{
+runPerfTest(int argc, char** argv) {
// Load async store module
qpid::tryShlib ("asyncStore.so", false);
@@ -212,7 +205,6 @@ runPerfTest(int argc, char** argv)
// -----------------------------------------------------------------
int
-main(int argc, char** argv)
-{
+main(int argc, char** argv) {
return tests::storePerftools::asyncPerf::runPerfTest(argc, argv);
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
index e4d99021b5..27d8d08faf 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
@@ -37,9 +37,11 @@
namespace qpid {
namespace asyncStore {
-class AsyncStoreImpl;
class AsyncStoreOptions;
}
+namespace broker {
+class SimpleQueue;
+}
namespace sys {
class Poller;
}}
@@ -48,7 +50,6 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class SimpleQueue;
class MessageConsumer;
class MessageProducer;
class TestOptions;
@@ -73,8 +74,8 @@ private:
boost::shared_ptr<qpid::sys::Poller> m_poller;
qpid::sys::Thread m_pollingThread;
qpid::broker::AsyncResultQueueImpl m_resultQueue;
- qpid::asyncStore::AsyncStoreImpl* m_store;
- std::deque<boost::shared_ptr<SimpleQueue> > m_queueList;
+ qpid::broker::AsyncStore* m_store;
+ std::deque<boost::shared_ptr<qpid::broker::SimpleQueue> > m_queueList;
std::deque<boost::shared_ptr<MessageProducer> > m_producers;
std::deque<boost::shared_ptr<MessageConsumer> > m_consumers;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
index 8c1f2976bf..20e9c39f1b 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
@@ -62,12 +62,10 @@ TestOptions::TestOptions(const uint32_t numMsgs,
doAddOptions();
}
-TestOptions::~TestOptions()
-{}
+TestOptions::~TestOptions() {}
void
-TestOptions::printVals(std::ostream& os) const
-{
+TestOptions::printVals(std::ostream& os) const {
tests::storePerftools::common::TestOptions::printVals(os);
os << " Num enqueus per transaction [-t, --enq-txn-size]: " << m_enqTxnBlockSize << std::endl;
os << " Num dequeues per transaction [-d, --deq-txn-size]: " << m_deqTxnBlockSize << std::endl;
@@ -77,8 +75,7 @@ TestOptions::printVals(std::ostream& os) const
// private
void
-TestOptions::doAddOptions()
-{
+TestOptions::doAddOptions() {
addOptions()
("enq-txn-size,t", qpid::optValue(m_enqTxnBlockSize, "N"),
"Num enqueus per transaction (0 = no transactions)")
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp
index cf6f293494..312fa187b8 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TestResult.cpp
@@ -32,12 +32,10 @@ TestResult::TestResult(const TestOptions& to) :
m_testOpts(to)
{}
-TestResult::~TestResult()
-{}
+TestResult::~TestResult() {}
void
-TestResult::toStream(std::ostream& os) const
-{
+TestResult::toStream(std::ostream& os) const {
double msgsRate;
os << "TEST RESULTS:" << std::endl;
os << " Msgs per thread: " << m_testOpts.m_numMsgs << std::endl;