summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-06-01 15:30:01 +0000
committerKim van der Riet <kpvdr@apache.org>2012-06-01 15:30:01 +0000
commitad9bebb1157f009151973cf721fdebdd663d39e3 (patch)
tree3b8dc0a9fa3de3b88bcbb82572a06cb579fa3002
parent220841d24ff48f27339000e887d5465a53c39013 (diff)
downloadqpid-python-ad9bebb1157f009151973cf721fdebdd663d39e3.tar.gz
WIP: Non-transactional message path in place. Transactions not working.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1345240 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/CMakeLists.txt3
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp32
-rw-r--r--cpp/src/qpid/asyncStore/AsyncStoreImpl.h6
-rw-r--r--cpp/src/qpid/asyncStore/OperationQueue.cpp6
-rw-r--r--cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h85
-rw-r--r--cpp/src/qpid/broker/AsyncStore.cpp3
-rw-r--r--cpp/src/qpid/broker/AsyncStore.h11
-rw-r--r--cpp/src/qpid/broker/BrokerAsyncContext.h15
-rw-r--r--cpp/src/qpid/broker/MessageHandle.h7
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.cpp48
-rw-r--r--cpp/src/qpid/broker/PersistableMessage.h18
-rw-r--r--cpp/src/tests/CMakeLists.txt4
-rw-r--r--cpp/src/tests/QueueTest.cpp4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h90
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp26
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h28
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp67
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h52
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp64
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h59
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp69
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h62
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/Messages.h52
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp58
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h20
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp344
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h102
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp88
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h13
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp48
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.h13
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp26
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h22
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp64
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h30
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp18
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TestOptions.h6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TestResult.h6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp32
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h40
-rw-r--r--cpp/src/tests/storePerftools/common/Thread.h2
41 files changed, 1308 insertions, 435 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 8246050f96..06608128bc 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -1078,7 +1078,6 @@ set (qpidbroker_SOURCES
qpid/amqp_0_10/Connection.cpp
qpid/broker/AsyncStore.cpp
qpid/broker/Broker.cpp
- qpid/broker/BrokerAsyncContext.h
qpid/broker/Credit.cpp
qpid/broker/Exchange.cpp
qpid/broker/ExpiryPolicy.cpp
@@ -1487,7 +1486,9 @@ set (jrnl2_SOURCES
# AsyncStore source files
set (asyncStore_SOURCES
+ qpid/asyncStore/AsyncOpCounter.cpp
qpid/asyncStore/AsyncOperation.cpp
+ qpid/asyncStore/AsyncStoreHandleImpl.cpp
qpid/asyncStore/AsyncStoreImpl.cpp
qpid/asyncStore/AsyncStoreOptions.cpp
qpid/asyncStore/ConfigHandleImpl.cpp
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
index 5a4905fef6..083034acc4 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
@@ -246,9 +246,9 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
void
AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
- qpid::broker::TxnHandle& txnHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
+ qpid::broker::TxnHandle& txnHandle,
+ qpid::broker::ResultCallback resultCb,
+ qpid::broker::BrokerAsyncContext* brokerCtxt)
{
AsyncOperation* op = new AsyncOperation(AsyncOperation::EVENT_DESTROY,
dynamic_cast<qpid::broker::IdHandle*>(&eventHandle),
@@ -260,18 +260,6 @@ AsyncStoreImpl::submitDestroy(qpid::broker::EventHandle& eventHandle,
void
AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
-{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_ENQUEUE,
- dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
- resultCb,
- brokerCtxt);
- m_operations.submit(op);
-}
-
-void
-AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
qpid::broker::ResultCallback resultCb,
qpid::broker::BrokerAsyncContext* brokerCtxt)
@@ -282,18 +270,8 @@ AsyncStoreImpl::submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
resultCb,
brokerCtxt);
m_operations.submit(op);
-}
-
-void
-AsyncStoreImpl::submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt)
-{
- AsyncOperation* op = new AsyncOperation(AsyncOperation::MSG_DEQUEUE,
- dynamic_cast<qpid::broker::IdHandle*>(&enqHandle),
- resultCb,
- brokerCtxt);
- m_operations.submit(op);
+//delete op;
+//delete brokerCtxt;
}
void
diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
index 7e3b3e94da..0298c74dc5 100644
--- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
+++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
@@ -111,16 +111,10 @@ public:
qpid::broker::BrokerAsyncContext* brokerCtxt);
void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
- void submitEnqueue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
qpid::broker::ResultCallback resultCb,
qpid::broker::BrokerAsyncContext* brokerCtxt);
void submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
- qpid::broker::ResultCallback resultCb,
- qpid::broker::BrokerAsyncContext* brokerCtxt);
- void submitDequeue(qpid::broker::EnqueueHandle& enqHandle,
qpid::broker::TxnHandle& txnHandle,
qpid::broker::ResultCallback resultCb,
qpid::broker::BrokerAsyncContext* brokerCtxt);
diff --git a/cpp/src/qpid/asyncStore/OperationQueue.cpp b/cpp/src/qpid/asyncStore/OperationQueue.cpp
index 1e52eb3612..69ddf7645e 100644
--- a/cpp/src/qpid/asyncStore/OperationQueue.cpp
+++ b/cpp/src/qpid/asyncStore/OperationQueue.cpp
@@ -23,8 +23,6 @@
#include "OperationQueue.h"
-#include "qpid/broker/BrokerAsyncContext.h"
-
namespace qpid {
namespace asyncStore {
@@ -42,7 +40,7 @@ OperationQueue::~OperationQueue()
void
OperationQueue::submit(const AsyncOperation* op)
{
-//std::cout << "***** OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
+//std::cout << "--> OperationQueue::submit() op=" << op->getOpStr() << std::endl << std::flush;
m_opQueue.push(op);
}
@@ -51,7 +49,7 @@ OperationQueue::OpQueue::Batch::const_iterator
OperationQueue::handle(const OperationQueue::OpQueue::Batch& e)
{
for (OpQueue::Batch::const_iterator i = e.begin(); i != e.end(); ++i) {
-//std::cout << "##### OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
+//std::cout << "<-- OperationQueue::handle() Op=" << (*i)->getOpStr() << std::endl << std::flush;
if ((*i)->m_resCb) {
((*i)->m_resCb)(new qpid::broker::AsyncResult, (*i)->m_brokerCtxt);
} else {
diff --git a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h
index b13caa5462..207bbc68f2 100644
--- a/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h
+++ b/cpp/src/qpid/asyncStore/jrnl2/AtomicCounter.h
@@ -40,7 +40,7 @@ public:
/**
* \brief Constructor with an option to set an inital value for the counter.
*/
- AtomicCounter(T initialValue = T(0)) :
+ AtomicCounter(const T initialValue = T(0)) :
m_cnt(initialValue)
{}
@@ -58,13 +58,90 @@ public:
* first call to next() will return 1. Upon overflow, the counter will be incremented twice so as to avoid
* returning the value 0.
*/
- virtual T next()
+ T
+ next()
{
- // --- START OF CRITICAL SECTION ---
ScopedLock l(m_mutex);
while (!++m_cnt) ; // Cannot return 0x0 if m_cnt should overflow
return m_cnt;
- } // --- END OF CRITICAL SECTION ---
+ }
+
+ void
+ operator++()
+ {
+ ScopedLock l(m_mutex);
+ ++m_cnt;
+ }
+
+ void
+ operator--()
+ {
+ ScopedLock l(m_mutex);
+ --m_cnt;
+ }
+
+ T
+ get() const
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt;
+ }
+
+ bool
+ operator==(const AtomicCounter<T>& rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt == rhs.get();
+ }
+
+ bool
+ operator==(const T rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt == rhs;
+ }
+
+ bool
+ operator!=(const AtomicCounter<T>& rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt != rhs.get();
+ }
+
+ bool
+ operator!=(const T rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt != rhs;
+ }
+
+ bool
+ operator>(const AtomicCounter<T>& rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt > rhs.get();
+ }
+
+ bool
+ operator>(const T rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt > rhs;
+ }
+
+ bool
+ operator<(const AtomicCounter<T>& rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt < rhs.get();
+ }
+
+ bool
+ operator<(const T rhs)
+ {
+ ScopedLock l(m_mutex);
+ return m_cnt < rhs;
+ }
protected:
T m_cnt; ///< Internal count value
diff --git a/cpp/src/qpid/broker/AsyncStore.cpp b/cpp/src/qpid/broker/AsyncStore.cpp
index ff3e77dba5..649049bf41 100644
--- a/cpp/src/qpid/broker/AsyncStore.cpp
+++ b/cpp/src/qpid/broker/AsyncStore.cpp
@@ -22,6 +22,9 @@
namespace qpid {
namespace broker {
+BrokerAsyncContext::~BrokerAsyncContext()
+{}
+
DataSource::~DataSource()
{}
diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h
index 15e9120edb..eb47d62cf0 100644
--- a/cpp/src/qpid/broker/AsyncStore.h
+++ b/cpp/src/qpid/broker/AsyncStore.h
@@ -28,11 +28,14 @@
#include <string>
namespace qpid {
-
namespace broker {
-// Defined by broker, implements qpid::messaging::Handle-type template to hide ref counting:
-class BrokerAsyncContext;
+// Defined by broker, implements qpid::messaging::Handle-type template to hide ref counting
+// Subclass this for specific contexts
+class BrokerAsyncContext {
+public:
+ virtual ~BrokerAsyncContext();
+};
// Subclassed by broker:
class DataSource {
@@ -96,9 +99,7 @@ public:
virtual void submitDestroy(EventHandle&, ResultCallback, BrokerAsyncContext*) = 0;
virtual void submitDestroy(EventHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0;
- virtual void submitEnqueue(EnqueueHandle&, ResultCallback, BrokerAsyncContext*) = 0;
virtual void submitEnqueue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0;
- virtual void submitDequeue(EnqueueHandle&, ResultCallback, BrokerAsyncContext*) = 0;
virtual void submitDequeue(EnqueueHandle&, TxnHandle&, ResultCallback, BrokerAsyncContext*) = 0;
// Legacy - Restore FTD message, is NOT async!
diff --git a/cpp/src/qpid/broker/BrokerAsyncContext.h b/cpp/src/qpid/broker/BrokerAsyncContext.h
deleted file mode 100644
index 38d53a84f1..0000000000
--- a/cpp/src/qpid/broker/BrokerAsyncContext.h
+++ /dev/null
@@ -1,15 +0,0 @@
-#ifndef qpid_broker_BrokerContext_hpp_
-#define qpid_broker_BrokerContext_hpp_
-
-namespace qpid {
-namespace broker {
-
-class BrokerAsyncContext
-{
-public:
- virtual ~BrokerAsyncContext() {}
-};
-
-}} // namespace qpid::broker
-
-#endif // qpid_broker_BrokerContext_hpp_
diff --git a/cpp/src/qpid/broker/MessageHandle.h b/cpp/src/qpid/broker/MessageHandle.h
index 74c38d92cc..9339d81f32 100644
--- a/cpp/src/qpid/broker/MessageHandle.h
+++ b/cpp/src/qpid/broker/MessageHandle.h
@@ -32,7 +32,8 @@
namespace qpid {
namespace broker {
-class MessageHandle : public qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>, public IdHandle
+class MessageHandle : public qpid::messaging::Handle<qpid::asyncStore::MessageHandleImpl>,
+ public IdHandle
{
public:
MessageHandle(qpid::asyncStore::MessageHandleImpl* p = 0);
@@ -44,8 +45,8 @@ public:
// <none>
private:
- typedef qpid::asyncStore::MessageHandleImpl Impl;
- Impl* impl;
+ //typedef qpid::asyncStore::MessageHandleImpl Impl;
+ //Impl* impl;
friend class qpid::messaging::PrivateImplRef<MessageHandle>;
};
diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp
index 7ba28eb293..957248b522 100644
--- a/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -21,7 +21,8 @@
#include "qpid/broker/PersistableMessage.h"
-#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/MessageStore.h"
+//#include "qpid/broker/AsyncStore.h"
#include <iostream>
using namespace qpid::broker;
@@ -29,13 +30,12 @@ using namespace qpid::broker;
namespace qpid {
namespace broker {
-class MessageStore;
-
PersistableMessage::~PersistableMessage() {}
PersistableMessage::PersistableMessage() :
asyncDequeueCounter(0),
- store(0)
+ store(0),
+ asyncStore(0)
{}
void PersistableMessage::flush()
@@ -78,8 +78,8 @@ bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
return false;
}
-
-void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+// deprecated
+void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
@@ -88,7 +88,22 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa
}
}
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
+ if (_store){
+ sys::ScopedLock<sys::Mutex> l(storeLock);
+ asyncStore = _store;
+ boost::weak_ptr<PersistableQueue> q(queue);
+ synclist.push_back(q);
+ }
+}
+
+// deprecated
+void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+ addToSyncList(queue, _store);
+ enqueueStart();
+}
+
+void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
addToSyncList(queue, _store);
enqueueStart();
}
@@ -111,7 +126,8 @@ void PersistableMessage::dequeueComplete() {
if (notify) allDequeuesComplete();
}
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
+// deprecated
+void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
store = _store;
@@ -121,6 +137,16 @@ void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, Messag
dequeueAsync();
}
+void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, AsyncStore* _store) {
+ if (_store){
+ sys::ScopedLock<sys::Mutex> l(storeLock);
+ asyncStore = _store;
+ boost::weak_ptr<PersistableQueue> q(queue);
+ synclist.push_back(q);
+ }
+ dequeueAsync();
+}
+
void PersistableMessage::dequeueAsync() {
sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
asyncDequeueCounter++;
@@ -128,11 +154,17 @@ void PersistableMessage::dequeueAsync() {
PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {}
+// deprecated
void PersistableMessage::setStore(MessageStore* s)
{
store = s;
}
+void PersistableMessage::setStore(AsyncStore* s)
+{
+ asyncStore = s;
+}
+
void PersistableMessage::requestContentRelease()
{
contentReleaseState.requested = true;
diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h
index d29c2c45b4..8823cfa638 100644
--- a/cpp/src/qpid/broker/PersistableMessage.h
+++ b/cpp/src/qpid/broker/PersistableMessage.h
@@ -37,6 +37,7 @@ namespace qpid {
namespace broker {
class MessageStore;
+class AsyncStore;
/**
* Base class for persistable messages.
@@ -86,7 +87,8 @@ class PersistableMessage : public Persistable
void setContentReleased();
- MessageStore* store;
+ MessageStore* store; // deprecated, use AsyncStore
+ AsyncStore* asyncStore; // new AsyncStore interface
public:
@@ -105,7 +107,8 @@ class PersistableMessage : public Persistable
QPID_BROKER_EXTERN bool isContentReleased() const;
- QPID_BROKER_EXTERN void setStore(MessageStore*);
+ QPID_BROKER_EXTERN void setStore(MessageStore*); // deprecated
+ QPID_BROKER_EXTERN void setStore(AsyncStore*);
void requestContentRelease();
void blockContentRelease();
bool checkContentReleasable();
@@ -121,20 +124,25 @@ class PersistableMessage : public Persistable
QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); }
QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); }
- QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
+ QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, // deprecated
MessageStore* _store);
+ QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
+ AsyncStore* _store);
QPID_BROKER_EXTERN bool isDequeueComplete();
QPID_BROKER_EXTERN void dequeueComplete();
- QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
+ QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, // deprecated
MessageStore* _store);
+ QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
+ AsyncStore* _store);
bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
- void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store);
+ void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); // deprecated
+ void addToSyncList(PersistableQueue::shared_ptr queue, AsyncStore* _store);
};
}}
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index 2bce8c058e..057879a602 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -380,6 +380,9 @@ endif (UNIX)
# Async store perf test (asyncPerf)
set (asyncStorePerf_SOURCES
storePerftools/asyncPerf/MessageAsyncContext.cpp
+ storePerftools/asyncPerf/MessageConsumer.cpp
+ storePerftools/asyncPerf/MessageDeque.cpp
+ storePerftools/asyncPerf/MessageProducer.cpp
storePerftools/asyncPerf/MockPersistableMessage.cpp
storePerftools/asyncPerf/MockPersistableQueue.cpp
storePerftools/asyncPerf/MockTransactionContext.cpp
@@ -396,7 +399,6 @@ set (asyncStorePerf_SOURCES
storePerftools/common/ScopedTimer.cpp
storePerftools/common/Streamable.cpp
storePerftools/common/TestOptions.cpp
- storePerftools/common/TestParameters.cpp
storePerftools/common/TestResult.cpp
storePerftools/common/Thread.cpp
)
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 0058aa5133..fb429ca981 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -106,7 +106,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) {
//Test basic delivery:
intrusive_ptr<Message> msg1 = create_message("e", "A");
- msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
+ msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process
queue->process(msg1);
sleep(2);
@@ -121,7 +121,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) {
QPID_AUTO_TEST_CASE(testAsyncMessageCount){
Queue::shared_ptr queue(new Queue("my_test_queue", true));
intrusive_ptr<Message> msg1 = create_message("e", "A");
- msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
+ msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process
queue->process(msg1);
sleep(2);
diff --git a/cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h b/cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h
new file mode 100644
index 0000000000..b777234616
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/AtomicCounter.h
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file AtomicCounter.h
+ */
+
+#ifndef tests_storePerftools_asyncPerf_AtomicCounter_h_
+#define tests_storePerftools_asyncPerf_AtomicCounter_h_
+
+#include "qpid/sys/Condition.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+template <class T>
+class AtomicCounter
+{
+public:
+ AtomicCounter(const T& initValue = T(0)) :
+ m_cnt(initValue),
+ m_cntMutex(),
+ m_cntCondition()
+ {}
+
+ virtual ~AtomicCounter()
+ {}
+
+ T&
+ get() const
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ return m_cnt;
+ }
+
+ void
+ operator++()
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ ++m_cnt;
+ }
+
+ void
+ operator--()
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ if (--m_cnt == 0) {
+ m_cntCondition.notify();
+ }
+ }
+
+ void
+ waitForZero(const qpid::sys::Duration& d)
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_cntMutex);
+ while (m_cnt != 0) {
+ m_cntCondition.wait(m_cntMutex, qpid::sys::AbsTime(qpid::sys::AbsTime(), d));
+ }
+ }
+
+protected:
+ T m_cnt;
+ mutable qpid::sys::Mutex m_cntMutex;
+ qpid::sys::Condition m_cntCondition;
+};
+
+typedef AtomicCounter<uint32_t> AsyncOpCounter;
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerftools_asyncPerf_AtomicCounter_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
index d88b6570a1..ad67bdd32f 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.cpp
@@ -23,50 +23,52 @@
#include "MessageAsyncContext.h"
+#include <cassert>
+
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-MessageContext::MessageContext(MockPersistableMessage::shared_ptr msg,
- const qpid::asyncStore::AsyncOperation::opCode op,
- MockPersistableQueue* q) :
+MessageAsyncContext::MessageAsyncContext(boost::shared_ptr<MockPersistableMessage> msg,
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ boost::shared_ptr<MockPersistableQueue> q) :
m_msg(msg),
m_op(op),
m_q(q)
{
assert(m_msg.get() != 0);
- assert(m_q != 0);
+ assert(m_q.get() != 0);
}
-MessageContext::~MessageContext()
+MessageAsyncContext::~MessageAsyncContext()
{}
qpid::asyncStore::AsyncOperation::opCode
-MessageContext::getOpCode() const
+MessageAsyncContext::getOpCode() const
{
return m_op;
}
const char*
-MessageContext::getOpStr() const
+MessageAsyncContext::getOpStr() const
{
return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
}
-MockPersistableMessage::shared_ptr
-MessageContext::getMessage() const
+boost::shared_ptr<MockPersistableMessage>
+MessageAsyncContext::getMessage() const
{
return m_msg;
}
-MockPersistableQueue*
-MessageContext::getQueue() const
+boost::shared_ptr<MockPersistableQueue>
+MessageAsyncContext::getQueue() const
{
return m_q;
}
void
-MessageContext::destroy()
+MessageAsyncContext::destroy()
{
delete this;
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
index 3a47b4dbe8..11da0d80bd 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageAsyncContext.h
@@ -24,31 +24,35 @@
#ifndef tests_storePerfTools_asyncPerf_MessageContext_h_
#define tests_storePerfTools_asyncPerf_MessageContext_h_
-#include "MockPersistableMessage.h"
-
#include "qpid/asyncStore/AsyncOperation.h"
-#include "qpid/broker/BrokerAsyncContext.h"
+#include "qpid/broker/AsyncStore.h" // qpid::broker::BrokerAsyncContext
+
+#include <boost/shared_ptr.hpp>
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class MessageContext : public qpid::broker::BrokerAsyncContext
+class MockPersistableMessage;
+class MockPersistableQueue;
+
+class MessageAsyncContext : public qpid::broker::BrokerAsyncContext
{
public:
- MessageContext(MockPersistableMessage::shared_ptr msg,
- const qpid::asyncStore::AsyncOperation::opCode op,
- MockPersistableQueue* q);
- virtual ~MessageContext();
+ MessageAsyncContext(boost::shared_ptr<MockPersistableMessage> msg,
+ const qpid::asyncStore::AsyncOperation::opCode op,
+ boost::shared_ptr<MockPersistableQueue> q);
+ virtual ~MessageAsyncContext();
qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
const char* getOpStr() const;
- MockPersistableMessage::shared_ptr getMessage() const;
- MockPersistableQueue* getQueue() const;
+ boost::shared_ptr<MockPersistableMessage> getMessage() const;
+ boost::shared_ptr<MockPersistableQueue> getQueue() const;
void destroy();
+
protected:
- MockPersistableMessage::shared_ptr m_msg;
+ boost::shared_ptr<MockPersistableMessage> m_msg;
const qpid::asyncStore::AsyncOperation::opCode m_op;
- MockPersistableQueue* m_q;
+ boost::shared_ptr<MockPersistableQueue> m_q;
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
new file mode 100644
index 0000000000..6042291a0a
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MessageConsumer.cpp
+ */
+
+#include "MessageConsumer.h"
+
+#include "MockPersistableQueue.h"
+#include "TestOptions.h"
+
+#include <stdint.h> // uint32_t
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class MockTransactionContext;
+
+MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
+ boost::shared_ptr<MockPersistableQueue> queue) :
+ m_perfTestParams(perfTestParams),
+ m_queue(queue)
+{}
+
+MessageConsumer::~MessageConsumer()
+{}
+
+void*
+MessageConsumer::runConsumers()
+{
+ uint32_t numMsgs = 0;
+ while (numMsgs < m_perfTestParams.m_numMsgs) {
+ if (m_queue->dispatch()) {
+ ++numMsgs;
+ } else {
+ ::usleep(1000); // TODO - replace this poller with condition variable
+ }
+ }
+ return 0;
+}
+
+//static
+void*
+MessageConsumer::startConsumers(void* ptr)
+{
+ return reinterpret_cast<MessageConsumer*>(ptr)->runConsumers();
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
new file mode 100644
index 0000000000..30305fbe1a
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MessageConsumer.h
+ */
+
+#ifndef tests_storePerftools_asyncPerf_MessageConsumer_h_
+#define tests_storePerftools_asyncPerf_MessageConsumer_h_
+
+#include "boost/shared_ptr.hpp"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class MockPersistableQueue;
+class TestOptions;
+
+class MessageConsumer
+{
+public:
+ MessageConsumer(const TestOptions& perfTestParams,
+ boost::shared_ptr<MockPersistableQueue> queue);
+ virtual ~MessageConsumer();
+
+ void* runConsumers();
+ static void* startConsumers(void* ptr);
+protected:
+ const TestOptions& m_perfTestParams;
+ boost::shared_ptr<MockPersistableQueue> m_queue;
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerftools_asyncPerf_MessageConsumer_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp
new file mode 100644
index 0000000000..c61ce352a1
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.cpp
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MessageDeque.cpp
+ */
+
+#include "MessageDeque.h"
+#include "QueuedMessage.h"
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+MessageDeque::MessageDeque()
+{}
+
+MessageDeque::~MessageDeque()
+{}
+
+uint32_t
+MessageDeque::size()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
+ return m_messages.size();
+}
+
+bool
+MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*removed*/)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
+ m_messages.push_back(added);
+ return false;
+}
+
+bool
+MessageDeque::consume(QueuedMessage& msg)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_msgMutex);
+ if (!m_messages.empty()) {
+ msg = m_messages.front();
+ m_messages.pop_front();
+ return true;
+ }
+ return false;
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h
new file mode 100644
index 0000000000..93ca099923
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageDeque.h
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MessageDeque.h
+ */
+
+/*
+ * This is a copy of qpid::broker::MessageDeque.h, but using the local
+ * tests::storePerftools::asyncPerf::QueuedMessage class instead of
+ * qpid::broker::QueuedMessage.
+ */
+
+#ifndef tests_storePerftools_asyncPerf_MessageDeque_h_
+#define tests_storePerftools_asyncPerf_MessageDeque_h_
+
+#include "Messages.h"
+
+#include "qpid/sys/Mutex.h"
+
+#include <deque>
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class MessageDeque : public Messages
+{
+public:
+ MessageDeque();
+ virtual ~MessageDeque();
+ uint32_t size();
+ bool push(const QueuedMessage& added, QueuedMessage& removed);
+ bool consume(QueuedMessage& msg);
+protected:
+ std::deque<QueuedMessage> m_messages;
+ qpid::sys::Mutex m_msgMutex;
+
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerftools_asyncPerf_MessageDeque_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
new file mode 100644
index 0000000000..8540ff2b61
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MessageProducer.cpp
+ */
+
+#include "MessageProducer.h"
+
+#include "MockPersistableMessage.h"
+#include "MockPersistableQueue.h"
+#include "TestOptions.h"
+
+#include <stdint.h> // uint32_t
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class MockTransactionContext;
+
+MessageProducer::MessageProducer(const TestOptions& perfTestParams,
+ const char* msgData,
+ qpid::asyncStore::AsyncStoreImpl* store,
+ boost::shared_ptr<MockPersistableQueue> queue) :
+ m_perfTestParams(perfTestParams),
+ m_msgData(msgData),
+ m_store(store),
+ m_queue(queue)
+{}
+
+MessageProducer::~MessageProducer()
+{}
+
+void*
+MessageProducer::runProducers()
+{
+ boost::shared_ptr<MockTransactionContext> txn;
+ for (uint32_t numMsgs=0; numMsgs<m_perfTestParams.m_numMsgs; ++numMsgs) {
+ boost::shared_ptr<MockPersistableMessage> msg(new MockPersistableMessage(m_msgData, m_perfTestParams.m_msgSize, m_store));
+ m_queue->deliver(msg);
+ }
+ return 0;
+}
+
+//static
+void*
+MessageProducer::startProducers(void* ptr)
+{
+ return reinterpret_cast<MessageProducer*>(ptr)->runProducers();
+}
+
+}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
new file mode 100644
index 0000000000..1b1f9b63fd
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file MessageProducer.h
+ */
+
+#ifndef tests_storePerftools_asyncPerf_MessageProducer_h_
+#define tests_storePerftools_asyncPerf_MessageProducer_h_
+
+#include "boost/shared_ptr.hpp"
+
+namespace qpid {
+namespace asyncStore {
+
+class AsyncStoreImpl;
+
+}}
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class MockPersistableQueue;
+class TestOptions;
+
+class MessageProducer
+{
+public:
+ MessageProducer(const TestOptions& perfTestParams,
+ const char* msgData,
+ qpid::asyncStore::AsyncStoreImpl* store,
+ boost::shared_ptr<MockPersistableQueue> queue);
+ virtual ~MessageProducer();
+ void* runProducers();
+ static void* startProducers(void* ptr);
+protected:
+ const TestOptions& m_perfTestParams;
+ const char* m_msgData;
+ qpid::asyncStore::AsyncStoreImpl* m_store;
+ boost::shared_ptr<MockPersistableQueue> m_queue;
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerftools_asyncPerf_MessageProducer_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/Messages.h b/cpp/src/tests/storePerftools/asyncPerf/Messages.h
new file mode 100644
index 0000000000..9b5bd0be99
--- /dev/null
+++ b/cpp/src/tests/storePerftools/asyncPerf/Messages.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file Messages.h
+ */
+
+/*
+ * This is a copy of qpid::broker::Messages.h, but using the local
+ * tests::storePerftools::asyncPerf::QueuedMessage class instead of
+ * qpid::broker::QueuedMessage.
+ */
+
+#ifndef tests_storePerftools_asyncPerf_Messages_h_
+#define tests_storePerftools_asyncPerf_Messages_h_
+
+#include <stdint.h>
+
+namespace tests {
+namespace storePerftools {
+namespace asyncPerf {
+
+class QueuedMessage;
+
+class Messages
+{
+public:
+ virtual ~Messages() {}
+ virtual uint32_t size() = 0;
+ virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0;
+ virtual bool consume(QueuedMessage& msg) = 0;
+};
+
+}}} // namespace tests::storePerftools::asyncPerf
+
+#endif // tests_storePerftools_asyncPerf_Messages_h_
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp
index e89a98d02d..e7cab4d621 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp
@@ -23,9 +23,6 @@
#include "MockPersistableMessage.h"
-#include "MessageAsyncContext.h"
-#include "MockPersistableQueue.h" // debug statements in enqueueComplete() and dequeueComplete()
-
#include "qpid/asyncStore/AsyncStoreImpl.h"
namespace tests {
@@ -34,46 +31,19 @@ namespace asyncPerf {
MockPersistableMessage::MockPersistableMessage(const char* msgData,
const uint32_t msgSize,
- qpid::asyncStore::AsyncStoreImpl* store,
- const bool persistent) :
+ qpid::asyncStore::AsyncStoreImpl* store) :
m_persistenceId(0ULL),
m_msg(msgData, static_cast<size_t>(msgSize)),
- m_persistent(persistent),
- m_msgHandle(store->createMessageHandle(this))
+ m_msgHandle(store ? store->createMessageHandle(this) : store->createMessageHandle(0))
{}
MockPersistableMessage::~MockPersistableMessage()
{}
-// static
-void
-MockPersistableMessage::handleAsyncResult(const qpid::broker::AsyncResult* res,
- qpid::broker::BrokerAsyncContext* bc)
+const qpid::broker::MessageHandle&
+MockPersistableMessage::getHandle() const
{
- if (bc) {
- MessageContext* mc = dynamic_cast<MessageContext*>(bc);
- if (res->errNo) {
- // TODO: Handle async failure here
- std::cerr << "Message pid=0x" << std::hex << mc->getMessage()->m_persistenceId << std::dec << ": Operation "
- << mc->getOpStr() << ": failure " << res->errNo << " (" << res->errMsg << ")" << std::endl;
- } else {
- // Handle async success here
- switch(mc->getOpCode()) {
- case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE:
- mc->getMessage()->dequeueComplete(mc);
- break;
- case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE:
- mc->getMessage()->enqueueComplete(mc);
- break;
- default:
- std::ostringstream oss;
- oss << "tests::storePerftools::asyncPerf::MockPersistableMessage::handleAsyncResult(): Unknown async queue operation: " << mc->getOpCode();
- throw qpid::Exception(oss.str());
- };
- }
- }
- if (bc) delete bc;
- if (res) delete res;
+ return m_msgHandle;
}
qpid::broker::MessageHandle&
@@ -119,7 +89,7 @@ MockPersistableMessage::encodedHeaderSize() const
bool
MockPersistableMessage::isPersistent() const
{
- return m_persistent;
+ return m_msgHandle.isValid();
}
uint64_t
@@ -134,20 +104,4 @@ MockPersistableMessage::write(char* target)
::memcpy(target, m_msg.data(), m_msg.size());
}
-// protected
-void
-MockPersistableMessage::enqueueComplete(const MessageContext* mc)
-{
-//std::cout << "~~~~~ Message pid=0x" << std::hex << mc->m_msg->getPersistenceId() << std::dec << ": enqueueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl << std::flush;
- assert(mc->getMessage().get() == this);
-}
-
-// protected
-void
-MockPersistableMessage::dequeueComplete(const MessageContext* mc)
-{
-//std::cout << "~~~~~ Message pid=0x" << std::hex << mc->m_msg->getPersistenceId() << std::dec << ": dequeueComplete() on queue \"" << mc->m_q->getName() << "\"" << std::endl << std::flush;
- assert(mc->getMessage().get() == this);
-}
-
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h
index c98bb8e843..fc1c3ee47a 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.h
@@ -28,6 +28,8 @@
#include "qpid/broker/MessageHandle.h"
#include "qpid/broker/PersistableMessage.h"
+#include <set>
+
namespace qpid {
namespace asyncStore {
class AsyncStoreImpl;
@@ -37,21 +39,17 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class MessageContext;
class MockPersistableQueue;
-class MockPersistableMessage: public qpid::broker::PersistableMessage, public qpid::broker::DataSource
+class MockPersistableMessage: public qpid::broker::PersistableMessage,
+ public qpid::broker::DataSource
{
public:
- typedef boost::shared_ptr<MockPersistableMessage> shared_ptr;
-
MockPersistableMessage(const char* msgData,
const uint32_t msgSize,
- qpid::asyncStore::AsyncStoreImpl* store,
- const bool persistent = true);
+ qpid::asyncStore::AsyncStoreImpl* store);
virtual ~MockPersistableMessage();
- static void handleAsyncResult(const qpid::broker::AsyncResult* res,
- qpid::broker::BrokerAsyncContext* bc);
+ const qpid::broker::MessageHandle& getHandle() const;
qpid::broker::MessageHandle& getHandle();
// Interface Persistable
@@ -72,13 +70,7 @@ public:
protected:
mutable uint64_t m_persistenceId;
const std::string m_msg;
- const bool m_persistent;
qpid::broker::MessageHandle m_msgHandle;
-
- // --- Ascnc op completions (called through handleAsyncResult) ---
- void enqueueComplete(const MessageContext* mc);
- void dequeueComplete(const MessageContext* mc);
-
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
index ede0830045..009f54a157 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
@@ -23,16 +23,13 @@
#include "MockPersistableQueue.h"
-#include "MessageAsyncContext.h"
+#include "MessageDeque.h"
#include "MockPersistableMessage.h"
#include "MockTransactionContext.h"
#include "QueueAsyncContext.h"
#include "QueuedMessage.h"
-#include "TestOptions.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
-#include "qpid/broker/BrokerAsyncContext.h"
-#include "qpid/broker/EnqueueHandle.h"
namespace tests {
namespace storePerftools {
@@ -40,19 +37,22 @@ namespace asyncPerf {
MockPersistableQueue::MockPersistableQueue(const std::string& name,
const qpid::framing::FieldTable& /*args*/,
- qpid::asyncStore::AsyncStoreImpl* store,
- const TestOptions& to,
- const char* msgData) :
+ qpid::asyncStore::AsyncStoreImpl* store) :
qpid::broker::PersistableQueue(),
m_name(name),
m_store(store),
+ m_asyncOpCounter(0UL),
m_persistenceId(0ULL),
m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this.
- m_perfTestOpts(to),
- m_msgData(msgData)
+ m_destroyPending(false),
+ m_destroyed(false),
+ m_barrier(*this),
+ m_messages(new MessageDeque())
{
- const qpid::types::Variant::Map qo;
- m_queueHandle = m_store->createQueueHandle(m_name, qo);
+ if (m_store != 0) {
+ const qpid::types::Variant::Map qo;
+ m_queueHandle = m_store->createQueueHandle(m_name, qo);
+ }
}
MockPersistableQueue::~MockPersistableQueue()
@@ -71,7 +71,7 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
if (bc && res) {
QueueAsyncContext* qc = dynamic_cast<QueueAsyncContext*>(bc);
if (res->errNo) {
- // TODO: Handle async failure here
+ // TODO: Handle async failure here (other than by simply printing a message)
std::cerr << "Queue name=\"" << qc->getQueue()->m_name << "\": Operation " << qc->getOpStr() << ": failure "
<< res->errNo << " (" << res->errMsg << ")" << std::endl;
} else {
@@ -86,6 +86,12 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
case qpid::asyncStore::AsyncOperation::QUEUE_DESTROY:
qc->getQueue()->destroyComplete(qc);
break;
+ case qpid::asyncStore::AsyncOperation::MSG_ENQUEUE:
+ qc->getQueue()->enqueueComplete(qc);
+ break;
+ case qpid::asyncStore::AsyncOperation::MSG_DEQUEUE:
+ qc->getQueue()->dequeueComplete(qc);
+ break;
default:
std::ostringstream oss;
oss << "tests::storePerftools::asyncPerf::MockPersistableQueue::handleAsyncResult(): Unknown async queue operation: " << qc->getOpCode();
@@ -97,127 +103,100 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
if (res) delete res;
}
+const qpid::broker::QueueHandle&
+MockPersistableQueue::getHandle() const
+{
+ return m_queueHandle;
+}
+
qpid::broker::QueueHandle&
MockPersistableQueue::getHandle()
{
return m_queueHandle;
}
-void
-MockPersistableQueue::asyncStoreCreate()
+qpid::asyncStore::AsyncStoreImpl*
+MockPersistableQueue::getStore()
{
- m_store->submitCreate(m_queueHandle,
- this,
- &handleAsyncResult,
- new QueueAsyncContext(this, qpid::asyncStore::AsyncOperation::QUEUE_CREATE));
+ return m_store;
}
void
-MockPersistableQueue::asyncStoreDestroy()
+MockPersistableQueue::asyncCreate()
{
- m_store->submitDestroy(m_queueHandle,
- &handleAsyncResult,
- new QueueAsyncContext(this, qpid::asyncStore::AsyncOperation::QUEUE_DESTROY));
+ if (m_store) {
+ m_store->submitCreate(m_queueHandle,
+ this,
+ &handleAsyncResult,
+ new QueueAsyncContext(shared_from_this(),
+ qpid::asyncStore::AsyncOperation::QUEUE_CREATE));
+ ++m_asyncOpCounter;
+ }
}
-void*
-MockPersistableQueue::runEnqueues()
+void
+MockPersistableQueue::asyncDestroy(const bool deleteQueue)
{
- uint32_t numMsgs = 0;
- uint16_t txnCnt = 0;
- const bool useTxn = m_perfTestOpts.m_enqTxnBlockSize > 0;
- MockTransactionContextPtr txn;
- while (numMsgs < m_perfTestOpts.m_numMsgs) {
- if (useTxn && txnCnt == 0) {
- txn.reset(new MockTransactionContext(m_store)); // equivalent to begin()
+ m_destroyPending = true;
+ if (m_store) {
+ if (deleteQueue) {
+ m_store->submitDestroy(m_queueHandle,
+ &handleAsyncResult,
+ new QueueAsyncContext(shared_from_this(),
+ qpid::asyncStore::AsyncOperation::QUEUE_DESTROY));
+ ++m_asyncOpCounter;
}
- MockPersistableMessage::shared_ptr msg(new MockPersistableMessage(m_msgData, m_perfTestOpts.m_msgSize, m_store, true));
- msg->setPersistenceId(m_store->getNextRid());
- qpid::broker::EnqueueHandle enqHandle = m_store->createEnqueueHandle(msg->getHandle(), m_queueHandle);
- MessageContext* msgCtxt = new MessageContext(msg,
- qpid::asyncStore::AsyncOperation::MSG_ENQUEUE,
- this);
- if (useTxn) {
- m_store->submitEnqueue(enqHandle,
- txn->getHandle(),
- &MockPersistableMessage::handleAsyncResult,
- dynamic_cast<qpid::broker::BrokerAsyncContext*>(msgCtxt));
- } else {
- m_store->submitEnqueue(enqHandle,
- &MockPersistableMessage::handleAsyncResult,
- dynamic_cast<qpid::broker::BrokerAsyncContext*>(msgCtxt));
- }
- QueuedMessagePtr qm(new QueuedMessage(msg, enqHandle, txn));
- push(qm);
- if (useTxn && ++txnCnt >= m_perfTestOpts.m_enqTxnBlockSize) {
- txn->commit();
- txnCnt = 0;
- }
- ++numMsgs;
- }
- if (txnCnt > 0) {
- txn->commit();
- txnCnt = 0;
+ m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000));
}
- return 0;
}
-void*
-MockPersistableQueue::runDequeues()
+void
+MockPersistableQueue::deliver(boost::shared_ptr<MockPersistableMessage> msg)
{
- uint32_t numMsgs = 0;
- const uint32_t numMsgsToDequeue = m_perfTestOpts.m_numMsgs * m_perfTestOpts.m_numEnqThreadsPerQueue / m_perfTestOpts.m_numDeqThreadsPerQueue;
- uint16_t txnCnt = 0;
- const bool useTxn = m_perfTestOpts.m_deqTxnBlockSize > 0;
- MockTransactionContextPtr txn;
- QueuedMessagePtr qm;
- while (numMsgs < numMsgsToDequeue) {
- if (useTxn && txnCnt == 0) {
- txn.reset(new MockTransactionContext(m_store)); // equivalent to begin()
- }
- pop(qm);
- if (qm.get()) {
- qpid::broker::EnqueueHandle enqHandle = qm->getEnqueueHandle();
- qpid::broker::BrokerAsyncContext* bc = new MessageContext(qm->getMessage(),
- qpid::asyncStore::AsyncOperation::MSG_DEQUEUE,
- this);
- if (useTxn) {
- m_store->submitDequeue(enqHandle,
- txn->getHandle(),
- &MockPersistableMessage::handleAsyncResult,
- bc);
- } else {
- m_store->submitDequeue(enqHandle,
- &MockPersistableMessage::handleAsyncResult,
- bc);
- }
- ++numMsgs;
- qm.reset(static_cast<QueuedMessage*>(0));
- if (useTxn && ++txnCnt >= m_perfTestOpts.m_deqTxnBlockSize) {
- txn->commit();
- txnCnt = 0;
- }
- }
+ QueuedMessage qm(this, msg);
+ if(enqueue((MockTransactionContext*)0, qm)) {
+ push(qm);
}
- if (txnCnt > 0) {
- txn->commit();
- txnCnt = 0;
+}
+
+bool
+MockPersistableQueue::dispatch()
+{
+ QueuedMessage qm;
+ if (m_messages->consume(qm)) {
+ return dequeue((MockTransactionContext*)0, qm);
}
- return 0;
+ return false;
}
-//static
-void*
-MockPersistableQueue::startEnqueues(void* ptr)
+bool
+MockPersistableQueue::enqueue(MockTransactionContext* ctxt,
+ QueuedMessage& qm)
{
- return reinterpret_cast<MockPersistableQueue*>(ptr)->runEnqueues();
+ ScopedUse u(m_barrier);
+ if (!u.m_acquired) {
+ return false;
+ }
+ if (qm.payload()->isPersistent() && m_store) {
+ qm.payload()->enqueueAsync(shared_from_this(), m_store);
+ return asyncEnqueue(ctxt, qm);
+ }
+ return false;
}
-//static
-void*
-MockPersistableQueue::startDequeues(void* ptr)
+bool
+MockPersistableQueue::dequeue(MockTransactionContext* ctxt,
+ QueuedMessage& qm)
{
- return reinterpret_cast<MockPersistableQueue*>(ptr)->runDequeues();
+ ScopedUse u(m_barrier);
+ if (!u.m_acquired) {
+ return false;
+ }
+ if (qm.payload()->isPersistent() && m_store) {
+ qm.payload()->dequeueAsync(shared_from_this(), m_store);
+ return asyncDequeue(ctxt, qm);
+ }
+ return false;
}
void
@@ -276,61 +255,158 @@ MockPersistableQueue::write(char* target)
::memcpy(target, m_persistableData.data(), m_persistableData.size());
}
+// --- Members & methods in msg handling path from qpid::Queue ---
+
+// protected
+MockPersistableQueue::UsageBarrier::UsageBarrier(MockPersistableQueue& q) :
+ m_parent(q),
+ m_count(0)
+{}
+
+// protected
+bool
+MockPersistableQueue::UsageBarrier::acquire()
+{
+ qpid::sys::Monitor::ScopedLock l(m_monitor);
+ if (m_parent.m_destroyed) {
+ return false;
+ } else {
+ ++m_count;
+ return true;
+ }
+}
+
+// protected
+void MockPersistableQueue::UsageBarrier::release()
+{
+ qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor);
+ if (--m_count == 0) {
+ m_monitor.notifyAll();
+ }
+}
+
+// protected
+void MockPersistableQueue::UsageBarrier::destroy()
+{
+ qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor);
+ m_parent.m_destroyed = true;
+ while (m_count) {
+ m_monitor.wait();
+ }
+}
+
+// protected
+MockPersistableQueue::ScopedUse::ScopedUse(UsageBarrier& b) :
+ m_barrier(b),
+ m_acquired(m_barrier.acquire())
+{}
+
+// protected
+MockPersistableQueue::ScopedUse::~ScopedUse()
+{
+ if (m_acquired) {
+ m_barrier.release();
+ }
+}
+
+// protected
+void
+MockPersistableQueue::push(QueuedMessage& qm,
+ bool /*isRecovery*/)
+{
+ QueuedMessage removed;
+ m_messages->push(qm, removed);
+}
+
+// --- End Members & methods in msg handling path from qpid::Queue ---
+
+// protected
+bool
+MockPersistableQueue::asyncEnqueue(MockTransactionContext* txn,
+ QueuedMessage& qm)
+{
+ qm.payload()->setPersistenceId(m_store->getNextRid());
+//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
+ m_store->submitEnqueue(/*enqHandle*/qm.enqHandle(),
+ txn->getHandle(),
+ &handleAsyncResult,
+ new QueueAsyncContext(shared_from_this(),
+ qm.payload(),
+ qpid::asyncStore::AsyncOperation::MSG_ENQUEUE));
+ ++m_asyncOpCounter;
+ return true;
+}
+
+// protected
+bool
+MockPersistableQueue::asyncDequeue(MockTransactionContext* txn,
+ QueuedMessage& qm)
+{
+//std::cout << "QQQ Queue=\"" << m_name << "\": asyncDequeue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
+ qpid::broker::EnqueueHandle enqHandle = qm.enqHandle();
+ m_store->submitDequeue(enqHandle,
+ txn->getHandle(),
+ &handleAsyncResult,
+ new QueueAsyncContext(shared_from_this(),
+ qm.payload(),
+ qpid::asyncStore::AsyncOperation::MSG_DEQUEUE));
+ ++m_asyncOpCounter;
+ return true;
+}
+
+// protected
+void
+MockPersistableQueue::destroyCheck(const std::string& opDescr) const
+{
+ if (m_destroyPending || m_destroyed) {
+ std::ostringstream oss;
+ oss << opDescr << " on queue \"" << m_name << "\" after call to destroy";
+ throw qpid::Exception(oss.str());
+ }
+}
+
// protected
void
MockPersistableQueue::createComplete(const QueueAsyncContext* qc)
{
-//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": createComplete()" << std::endl << std::flush;
+//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": createComplete()" << std::endl << std::flush;
assert(qc->getQueue().get() == this);
+ --m_asyncOpCounter;
}
// protected
void
MockPersistableQueue::flushComplete(const QueueAsyncContext* qc)
{
-//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": flushComplete()" << std::endl << std::flush;
+//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": flushComplete()" << std::endl << std::flush;
assert(qc->getQueue().get() == this);
+ --m_asyncOpCounter;
}
// protected
void
MockPersistableQueue::destroyComplete(const QueueAsyncContext* qc)
{
-//std::cout << "~~~~~ Queue name=\"" << qc->m_q->getName() << "\": destroyComplete()" << std::endl << std::flush;
+//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": destroyComplete()" << std::endl << std::flush;
assert(qc->getQueue().get() == this);
+ --m_asyncOpCounter;
+ m_destroyed = true;
}
-// protected
void
-MockPersistableQueue::push(QueuedMessagePtr& qm)
+MockPersistableQueue::enqueueComplete(const QueueAsyncContext* qc)
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
- m_enqueuedMsgs.push_back(qm);
- m_dequeueCondition.notify();
+//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": enqueueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush;
+ assert(qc->getQueue().get() == this);
+ --m_asyncOpCounter;
}
-// protected
void
-MockPersistableQueue::pop(QueuedMessagePtr& qm)
+MockPersistableQueue::dequeueComplete(const QueueAsyncContext* qc)
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
- while (m_enqueuedMsgs.empty()) {
- m_dequeueCondition.wait(m_enqueuedMsgsMutex);
- }
- qm = m_enqueuedMsgs.front();
- if (qm->isTransactional()) {
- // The next msg is still in an open transaction, skip and find next non-open-txn msg
- MsgEnqListItr i = m_enqueuedMsgs.begin();
- while (++i != m_enqueuedMsgs.end()) {
- if (!(*i)->isTransactional()) {
- qm = *i;
- m_enqueuedMsgs.erase(i);
- }
- }
- } else {
- // The next msg is not in an open txn
- m_enqueuedMsgs.pop_front();
- }
+//std::cout << "QQQ Queue name=\"" << qc->getQueue()->getName() << "\": dequeueComplete() rid=0x" << std::hex << qc->getMessage()->getPersistenceId() << std::dec << std::endl << std::flush;
+ assert(qc->getQueue().get() == this);
+ --m_asyncOpCounter;
}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h
index 2d7b1f1c4e..ff6db93542 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h
@@ -24,19 +24,21 @@
#ifndef tests_storePerftools_asyncPerf_MockPersistableQueue_h_
#define tests_storePerftools_asyncPerf_MockPersistableQueue_h_
+#include "AtomicCounter.h" // AsyncOpCounter
+
#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/QueueHandle.h"
-#include "qpid/sys/Condition.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
-#include <boost/intrusive_ptr.hpp>
-#include <deque>
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
namespace qpid {
namespace asyncStore {
class AsyncStoreImpl;
}
+
namespace framing {
class FieldTable;
}}
@@ -45,36 +47,39 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
+class Messages;
+class MockPersistableMessage;
+class MockPersistableQueue;
+class MockTransactionContext;
class QueueAsyncContext;
class QueuedMessage;
-class TestOptions;
-
-typedef boost::shared_ptr<QueuedMessage> QueuedMessagePtr;
-class MockPersistableQueue : public qpid::broker::PersistableQueue, public qpid::broker::DataSource
+class MockPersistableQueue : public boost::enable_shared_from_this<MockPersistableQueue>,
+ public qpid::broker::PersistableQueue,
+ public qpid::broker::DataSource
{
public:
- typedef boost::intrusive_ptr<MockPersistableQueue> intrusive_ptr;
-
MockPersistableQueue(const std::string& name,
const qpid::framing::FieldTable& args,
- qpid::asyncStore::AsyncStoreImpl* store,
- const TestOptions& perfTestParams,
- const char* msgData);
+ qpid::asyncStore::AsyncStoreImpl* store);
virtual ~MockPersistableQueue();
- // --- Async functionality ---
static void handleAsyncResult(const qpid::broker::AsyncResult* res,
qpid::broker::BrokerAsyncContext* bc);
+ const qpid::broker::QueueHandle& getHandle() const;
qpid::broker::QueueHandle& getHandle();
- void asyncStoreCreate();
- void asyncStoreDestroy();
+ qpid::asyncStore::AsyncStoreImpl* getStore();
+
+ void asyncCreate();
+ void asyncDestroy(const bool deleteQueue);
- // --- Performance test thread entry points ---
- void* runEnqueues();
- void* runDequeues();
- static void* startEnqueues(void* ptr);
- static void* startDequeues(void* ptr);
+ // --- Methods in msg handling path from qpid::Queue ---
+ void deliver(boost::shared_ptr<MockPersistableMessage> msg);
+ bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&) but without Consumer param
+ bool enqueue(MockTransactionContext* ctxt,
+ QueuedMessage& qm);
+ bool dequeue(MockTransactionContext* ctxt,
+ QueuedMessage& qm);
// --- Interface qpid::broker::Persistable ---
virtual void encode(qpid::framing::Buffer& buffer) const;
@@ -94,28 +99,51 @@ public:
protected:
const std::string m_name;
qpid::asyncStore::AsyncStoreImpl* m_store;
+ AsyncOpCounter m_asyncOpCounter;
mutable uint64_t m_persistenceId;
std::string m_persistableData;
qpid::broker::QueueHandle m_queueHandle;
-
- // Test params
- const TestOptions& m_perfTestOpts;
- const char* m_msgData;
-
- typedef std::deque<QueuedMessagePtr> MsgEnqList;
- typedef MsgEnqList::iterator MsgEnqListItr;
- MsgEnqList m_enqueuedMsgs;
- qpid::sys::Mutex m_enqueuedMsgsMutex;
- qpid::sys::Condition m_dequeueCondition;
-
- // --- Ascnc op completions (called through handleAsyncResult) ---
+ bool m_destroyPending;
+ bool m_destroyed;
+
+ // --- Members & methods in msg handling path copied from qpid::Queue ---
+ struct UsageBarrier
+ {
+ MockPersistableQueue& m_parent;
+ uint32_t m_count;
+ qpid::sys::Monitor m_monitor;
+ UsageBarrier(MockPersistableQueue& q);
+ bool acquire();
+ void release();
+ void destroy();
+ };
+ struct ScopedUse
+ {
+ UsageBarrier& m_barrier;
+ const bool m_acquired;
+ ScopedUse(UsageBarrier& b);
+ ~ScopedUse();
+ };
+ UsageBarrier m_barrier;
+ std::auto_ptr<Messages> m_messages;
+ void push(QueuedMessage& qm,
+ bool isRecovery = false);
+
+ // -- Async ops ---
+ bool asyncEnqueue(MockTransactionContext* txn,
+ QueuedMessage& qm);
+ bool asyncDequeue(MockTransactionContext* txn,
+ QueuedMessage& qm);
+
+ // --- Async op counter ---
+ void destroyCheck(const std::string& opDescr) const;
+
+ // --- Async op completions (called through handleAsyncResult) ---
void createComplete(const QueueAsyncContext* qc);
void flushComplete(const QueueAsyncContext* qc);
void destroyComplete(const QueueAsyncContext* qc);
-
- // --- Queue functionality ---
- void push(QueuedMessagePtr& msg);
- void pop(QueuedMessagePtr& msg);
+ void enqueueComplete(const QueueAsyncContext* qc);
+ void dequeueComplete(const QueueAsyncContext* qc);
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp
index 05f06e95a1..c444f596e5 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp
@@ -23,24 +23,41 @@
#include "MockTransactionContext.h"
-#include "QueuedMessage.h"
#include "TransactionAsyncContext.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
-#include "qpid/broker/BrokerAsyncContext.h"
+
+#include <uuid/uuid.h>
namespace tests {
namespace storePerftools {
namespace asyncPerf {
+MockTransactionContext::MockTransactionContext(const std::string& xid) :
+ qpid::broker::TransactionContext(),
+ m_xid(xid),
+ m_tpcFlag(!xid.empty()),
+ m_store(0),
+ m_txnHandle(0),
+ m_prepared(false),
+ m_enqueuedMsgs()
+{
+ if (!m_tpcFlag) {
+ setLocalXid();
+ }
+//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl;
+}
+
MockTransactionContext::MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store,
const std::string& xid) :
m_store(store),
- m_txnHandle(store->createTxnHandle(xid)),
m_prepared(false),
m_enqueuedMsgs()
{
//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl;
+ if (m_store != 0) {
+ m_txnHandle = store->createTxnHandle(xid);
+ }
}
MockTransactionContext::~MockTransactionContext()
@@ -80,6 +97,12 @@ MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res,
if (res) delete res;
}
+const qpid::broker::TxnHandle&
+MockTransactionContext::getHandle() const
+{
+ return m_txnHandle;
+}
+
qpid::broker::TxnHandle&
MockTransactionContext::getHandle()
{
@@ -89,13 +112,13 @@ MockTransactionContext::getHandle()
bool
MockTransactionContext::is2pc() const
{
- return m_txnHandle.is2pc();
+ return m_tpcFlag;
}
const std::string&
MockTransactionContext::getXid() const
{
- return m_txnHandle.getXid();
+ return m_xid;
}
void
@@ -108,7 +131,7 @@ MockTransactionContext::addEnqueuedMsg(QueuedMessage* qm)
void
MockTransactionContext::prepare()
{
- if (m_txnHandle.is2pc()) {
+ if (m_tpcFlag) {
localPrepare();
m_prepared = true;
}
@@ -126,9 +149,11 @@ MockTransactionContext::abort()
if (!m_prepared) {
localPrepare();
}
- m_store->submitAbort(m_txnHandle,
- &handleAsyncResult,
- dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT)));
+ if (m_store != 0) {
+// m_store->submitAbort(m_txnHandle,
+// &handleAsyncResult,
+// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT)));
+ }
//std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
}
@@ -145,9 +170,11 @@ MockTransactionContext::commit()
} else {
localPrepare();
}
- m_store->submitCommit(m_txnHandle,
- &handleAsyncResult,
- dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT)));
+ if (m_store != 0) {
+// m_store->submitCommit(m_txnHandle,
+// &handleAsyncResult,
+// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT)));
+ }
//std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
}
@@ -156,23 +183,38 @@ MockTransactionContext::commit()
void
MockTransactionContext::localPrepare()
{
- m_store->submitPrepare(m_txnHandle,
- &handleAsyncResult,
- dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE)));
+ if (m_store != 0) {
+// m_store->submitPrepare(m_txnHandle,
+// &handleAsyncResult,
+// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE)));
+ }
//std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl;
}
// protected
void
-MockTransactionContext::prepareComplete(const TransactionAsyncContext* tc)
+MockTransactionContext::setLocalXid()
+{
+ uuid_t uuid;
+ // TODO: Valgrind warning: Possible race condition in uuid_generate_random() - is it thread-safe, and if not, does it matter?
+ // If this race condition affects the randomness of the UUID, then there could be a problem here.
+ ::uuid_generate_random(uuid);
+ char uuidStr[37]; // 36-char uuid + trailing '\0'
+ ::uuid_unparse(uuid, uuidStr);
+ m_xid.assign(uuidStr);
+}
+
+// protected
+void
+MockTransactionContext::prepareComplete(const TransactionAsyncContext* /*tc*/)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex);
- while (!m_enqueuedMsgs.empty()) {
- m_enqueuedMsgs.front()->clearTransaction();
- m_enqueuedMsgs.pop_front();
- }
+// while (!m_enqueuedMsgs.empty()) {
+// m_enqueuedMsgs.front()->clearTransaction();
+// m_enqueuedMsgs.pop_front();
+// }
//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush;
- assert(tc->getTransactionContext() == this);
+// assert(tc->getTransactionContext().get() == this);
}
@@ -181,7 +223,7 @@ void
MockTransactionContext::abortComplete(const TransactionAsyncContext* tc)
{
//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush;
- assert(tc->getTransactionContext() == this);
+ assert(tc->getTransactionContext().get() == this);
}
@@ -190,7 +232,7 @@ void
MockTransactionContext::commitComplete(const TransactionAsyncContext* tc)
{
//std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush;
- assert(tc->getTransactionContext() == this);
+ assert(tc->getTransactionContext().get() == this);
}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h
index 2f8dd716f4..3f70b0bfda 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h
@@ -24,17 +24,19 @@
#ifndef tests_storePerftools_asyncPerf_MockTransactionContext_h_
#define tests_storePerftools_asyncPerf_MockTransactionContext_h_
-#include "qpid/broker/AsyncStore.h" // qpid::broker::AsyncResult
#include "qpid/broker/TransactionalStore.h" // qpid::broker::TransactionContext
#include "qpid/broker/TxnHandle.h"
#include "qpid/sys/Mutex.h"
-#include <boost/shared_ptr.hpp>
#include <deque>
namespace qpid {
namespace asyncStore {
class AsyncStoreImpl;
+}
+namespace broker {
+class AsyncResult;
+class BrokerAsyncContext;
}}
namespace tests {
@@ -47,14 +49,14 @@ class TransactionAsyncContext;
class MockTransactionContext : public qpid::broker::TransactionContext
{
public:
- typedef boost::shared_ptr<MockTransactionContext> shared_ptr;
-
+ MockTransactionContext(const std::string& xid = std::string());
MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store,
const std::string& xid = std::string());
virtual ~MockTransactionContext();
static void handleAsyncResult(const qpid::broker::AsyncResult* res,
qpid::broker::BrokerAsyncContext* bc);
+ const qpid::broker::TxnHandle& getHandle() const;
qpid::broker::TxnHandle& getHandle();
bool is2pc() const;
const std::string& getXid() const;
@@ -65,6 +67,8 @@ public:
void commit();
protected:
+ std::string m_xid;
+ bool m_tpcFlag;
qpid::asyncStore::AsyncStoreImpl* m_store;
qpid::broker::TxnHandle m_txnHandle;
bool m_prepared;
@@ -72,6 +76,7 @@ protected:
qpid::sys::Mutex m_enqueuedMsgsMutex;
void localPrepare();
+ void setLocalXid();
// --- Ascnc op completions (called through handleAsyncResult) ---
void prepareComplete(const TransactionAsyncContext* tc);
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
index 7387c348fd..184a899570 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
@@ -23,6 +23,8 @@
#include "PerfTest.h"
+#include "MessageConsumer.h"
+#include "MessageProducer.h"
#include "MockPersistableQueue.h"
#include "tests/storePerftools/version.h"
@@ -30,6 +32,7 @@
#include "tests/storePerftools/common/Thread.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/sys/Poller.h"
#include <iomanip>
@@ -56,8 +59,9 @@ PerfTest::~PerfTest()
m_pollingThread.join();
m_queueList.clear();
+ m_queueList.clear();
+ m_producers.clear();
- if (m_store) delete m_store;
delete[] m_msgData;
}
@@ -69,13 +73,21 @@ PerfTest::prepareStore()
}
void
+PerfTest::destroyStore()
+{
+ if (m_store) {
+ delete m_store;
+ }
+}
+
+void
PerfTest::prepareQueues()
{
for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) {
std::ostringstream qname;
qname << "queue_" << std::setw(4) << std::setfill('0') << i;
- MockPersistableQueue::intrusive_ptr mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store, m_testOpts, m_msgData));
- mpq->asyncStoreCreate();
+ boost::shared_ptr<MockPersistableQueue> mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store));
+ mpq->asyncCreate();
m_queueList.push_back(mpq);
}
}
@@ -83,32 +95,38 @@ PerfTest::prepareQueues()
void
PerfTest::destroyQueues()
{
- for (std::deque<MockPersistableQueue::intrusive_ptr>::iterator i=m_queueList.begin(); i!=m_queueList.end(); ++i) {
- (*i)->asyncStoreDestroy();
+ while (m_queueList.size() > 0) {
+ m_queueList.front()->asyncDestroy(m_testOpts.m_destroyQueuesOnCompletion);
+ m_queueList.pop_front();
}
}
void
PerfTest::run()
{
- typedef boost::shared_ptr<tests::storePerftools::common::Thread> ThreadPtr; // TODO - replace with qpid threads
-
- prepareStore();
+ if (m_testOpts.m_durable) {
+ prepareStore();
+ }
prepareQueues();
- std::deque<ThreadPtr> threads;
+ // TODO: replace with qpid::sys::Thread
+ std::deque<boost::shared_ptr<tests::storePerftools::common::Thread> > threads;
{ // --- Start of timed section ---
tests::storePerftools::common::ScopedTimer st(m_testResult);
for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) {
+ boost::shared_ptr<MessageProducer> mp(new MessageProducer(m_testOpts, m_msgData, m_store, m_queueList[q]));
+ m_producers.push_back(mp);
for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads
- ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startEnqueues,
- reinterpret_cast<void*>(m_queueList[q].get())));
+ boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mp->startProducers,
+ reinterpret_cast<void*>(mp.get())));
threads.push_back(tp);
}
+ boost::shared_ptr<MessageConsumer> mc(new MessageConsumer(m_testOpts, m_queueList[q]));
+ m_consumers.push_back(mc);
for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads
- ThreadPtr tp(new tests::storePerftools::common::Thread(m_queueList[q]->startDequeues,
- reinterpret_cast<void*>(m_queueList[q].get())));
+ boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mc->startConsumers,
+ reinterpret_cast<void*>(mc.get())));
threads.push_back(tp);
}
}
@@ -117,8 +135,8 @@ PerfTest::run()
threads.pop_front();
}
} // --- End of timed section ---
- // TODO: Add test param to allow queues to be destroyed or left when test ends
destroyQueues();
+ destroyStore();
}
void
@@ -172,6 +190,6 @@ main(int argc, char** argv)
// Print test result
std::cout << apt << std::endl;
- ::sleep(1);
+ //::sleep(1);
return 0;
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
index 2b1e65f871..3bd3f6bd32 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
@@ -24,13 +24,11 @@
#ifndef tests_storePerftools_asyncPerf_PerfTest_h_
#define tests_storePerftools_asyncPerf_PerfTest_h_
-#include "MockPersistableQueue.h"
#include "TestResult.h"
#include "tests/storePerftools/common/Streamable.h"
#include "qpid/framing/FieldTable.h"
-#include "qpid/sys/Poller.h"
#include "qpid/sys/Thread.h"
#include <boost/shared_ptr.hpp>
@@ -40,12 +38,18 @@ namespace qpid {
namespace asyncStore {
class AsyncStoreImpl;
class AsyncStoreOptions;
+}
+namespace sys {
+class Poller;
}}
namespace tests {
namespace storePerftools {
namespace asyncPerf {
+class MockPersistableQueue;
+class MessageConsumer;
+class MessageProducer;
class TestOptions;
class PerfTest : public tests::storePerftools::common::Streamable
@@ -66,9 +70,12 @@ protected:
boost::shared_ptr<qpid::sys::Poller> m_poller;
qpid::sys::Thread m_pollingThread;
qpid::asyncStore::AsyncStoreImpl* m_store;
- std::deque<MockPersistableQueue::intrusive_ptr> m_queueList;
+ std::deque<boost::shared_ptr<MockPersistableQueue> > m_queueList;
+ std::deque<boost::shared_ptr<MessageProducer> > m_producers;
+ std::deque<boost::shared_ptr<MessageConsumer> > m_consumers;
void prepareStore();
+ void destroyStore();
void prepareQueues();
void destroyQueues();
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
index 281fc03e2c..be0c087390 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.cpp
@@ -23,17 +23,29 @@
#include "QueueAsyncContext.h"
+#include <cassert>
+
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-QueueAsyncContext::QueueAsyncContext(MockPersistableQueue::intrusive_ptr q,
- const qpid::asyncStore::AsyncOperation::opCode op) :
- qpid::broker::BrokerAsyncContext(),
+QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q,
+ const qpid::asyncStore::AsyncOperation::opCode op) :
+ m_q(q),
+ m_op(op)
+{
+ assert(m_q.get() != 0);
+}
+
+QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q,
+ boost::shared_ptr<MockPersistableMessage> msg,
+ const qpid::asyncStore::AsyncOperation::opCode op) :
m_q(q),
+ m_msg(msg),
m_op(op)
{
assert(m_q.get() != 0);
+ assert(m_msg.get() != 0);
}
QueueAsyncContext::~QueueAsyncContext()
@@ -51,12 +63,18 @@ QueueAsyncContext::getOpStr() const
return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
}
-MockPersistableQueue::intrusive_ptr
+boost::shared_ptr<MockPersistableQueue>
QueueAsyncContext::getQueue() const
{
return m_q;
}
+boost::shared_ptr<MockPersistableMessage>
+QueueAsyncContext::getMessage() const
+{
+ return m_msg;
+}
+
void
QueueAsyncContext::destroy()
{
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
index 657e80694a..2b6b3778cd 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueueAsyncContext.h
@@ -24,26 +24,36 @@
#ifndef tests_storePerftools_asyncPerf_QueueContext_h_
#define tests_storePerftools_asyncPerf_QueueContext_h_
-#include "MockPersistableQueue.h"
#include "qpid/asyncStore/AsyncOperation.h"
-#include "qpid/broker/BrokerAsyncContext.h"
+#include "qpid/broker/AsyncStore.h"
+
+#include <boost/shared_ptr.hpp>
namespace tests {
namespace storePerftools {
namespace asyncPerf {
+class MockPersistableMessage;
+class MockPersistableQueue;
+
class QueueAsyncContext: public qpid::broker::BrokerAsyncContext
{
public:
- QueueAsyncContext(MockPersistableQueue::intrusive_ptr q,
- const qpid::asyncStore::AsyncOperation::opCode op);
+ QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q,
+ const qpid::asyncStore::AsyncOperation::opCode op);
+ QueueAsyncContext(boost::shared_ptr<MockPersistableQueue> q,
+ boost::shared_ptr<MockPersistableMessage> msg,
+ const qpid::asyncStore::AsyncOperation::opCode op);
virtual ~QueueAsyncContext();
qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
const char* getOpStr() const;
- MockPersistableQueue::intrusive_ptr getQueue() const;
+ boost::shared_ptr<MockPersistableQueue> getQueue() const;
+ boost::shared_ptr<MockPersistableMessage> getMessage() const;
void destroy();
+
protected:
- MockPersistableQueue::intrusive_ptr m_q;
+ boost::shared_ptr<MockPersistableQueue> m_q;
+ boost::shared_ptr<MockPersistableMessage> m_msg;
const qpid::asyncStore::AsyncOperation::opCode m_op;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
index 9e5e131a28..7903d6551a 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
@@ -23,20 +23,71 @@
#include "QueuedMessage.h"
-#include "MockTransactionContext.h"
+#include "MockPersistableMessage.h"
+#include "MockPersistableQueue.h"
+
+#include "qpid/asyncStore/AsyncStoreImpl.h"
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-QueuedMessage::QueuedMessage(MockPersistableMessage::shared_ptr msg,
+QueuedMessage::QueuedMessage() :
+ m_queue(0)
+{}
+
+QueuedMessage::QueuedMessage(MockPersistableQueue* q,
+ boost::shared_ptr<MockPersistableMessage> msg) :
+ m_queue(q),
+ m_msg(msg),
+ m_enqHandle(q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()))
+{}
+
+QueuedMessage::QueuedMessage(const QueuedMessage& qm) :
+ m_queue(qm.m_queue),
+ m_msg(qm.m_msg),
+ m_enqHandle(qm.m_enqHandle)
+{}
+
+QueuedMessage::~QueuedMessage()
+{}
+
+QueuedMessage&
+QueuedMessage::operator=(const QueuedMessage& rhs)
+{
+ m_queue = rhs.m_queue;
+ m_msg = rhs.m_msg;
+ m_enqHandle = rhs.m_enqHandle;
+ return *this;
+}
+
+boost::shared_ptr<MockPersistableMessage>
+QueuedMessage::payload() const
+{
+ return m_msg;
+}
+
+const qpid::broker::EnqueueHandle&
+QueuedMessage::enqHandle() const
+{
+ return m_enqHandle;
+}
+
+qpid::broker::EnqueueHandle&
+QueuedMessage::enqHandle()
+{
+ return m_enqHandle;
+}
+
+/*
+QueuedMessage::QueuedMessage(boost::shared_ptr<MockPersistableMessage> msg,
qpid::broker::EnqueueHandle& enqHandle,
- MockTransactionContextPtr txn) :
+ boost::shared_ptr<MockTransactionContext> txn) :
m_msg(msg),
m_enqHandle(enqHandle),
m_txn(txn)
{
- if (txn) {
+ if (txn.get()) {
txn->addEnqueuedMsg(this);
}
}
@@ -44,7 +95,7 @@ QueuedMessage::QueuedMessage(MockPersistableMessage::shared_ptr msg,
QueuedMessage::~QueuedMessage()
{}
-MockPersistableMessage::shared_ptr
+boost::shared_ptr<MockPersistableMessage>
QueuedMessage::getMessage() const
{
return m_msg;
@@ -56,7 +107,7 @@ QueuedMessage::getEnqueueHandle() const
return m_enqHandle;
}
-MockTransactionContextPtr
+boost::shared_ptr<MockTransactionContext>
QueuedMessage::getTransactionContext() const
{
return m_txn;
@@ -73,5 +124,6 @@ QueuedMessage::clearTransaction()
{
m_txn.reset(static_cast<MockTransactionContext*>(0));
}
+*/
}}} // namespace tests::storePerfTools
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
index a11b23888a..9ad67cc925 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
@@ -24,35 +24,33 @@
#ifndef tests_storePerftools_asyncPerf_QueuedMessage_h_
#define tests_storePerftools_asyncPerf_QueuedMessage_h_
-#include "MockPersistableMessage.h"
-
#include "qpid/broker/EnqueueHandle.h"
+
#include <boost/shared_ptr.hpp>
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class MockTransactionContext;
-typedef boost::shared_ptr<MockTransactionContext> MockTransactionContextPtr;
+class MockPersistableMessage;
+class MockPersistableQueue;
class QueuedMessage
{
public:
- QueuedMessage(MockPersistableMessage::shared_ptr msg,
- qpid::broker::EnqueueHandle& enqHandle,
- MockTransactionContextPtr txn);
- virtual ~QueuedMessage();
- MockPersistableMessage::shared_ptr getMessage() const;
- qpid::broker::EnqueueHandle getEnqueueHandle() const;
- MockTransactionContextPtr getTransactionContext() const;
- bool isTransactional() const;
- void clearTransaction();
-
+ QueuedMessage();
+ QueuedMessage(MockPersistableQueue* q,
+ boost::shared_ptr<MockPersistableMessage> msg);
+ QueuedMessage(const QueuedMessage& qm);
+ ~QueuedMessage();
+ QueuedMessage& operator=(const QueuedMessage& rhs);
+ boost::shared_ptr<MockPersistableMessage> payload() const;
+ const qpid::broker::EnqueueHandle& enqHandle() const;
+ qpid::broker::EnqueueHandle& enqHandle();
protected:
- MockPersistableMessage::shared_ptr m_msg;
+ MockPersistableQueue* m_queue;
+ boost::shared_ptr<MockPersistableMessage> m_msg;
qpid::broker::EnqueueHandle m_enqHandle;
- MockTransactionContextPtr m_txn;
};
}}} // namespace tests::storePerfTools
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
index 27784ef661..2f4461e8b5 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
@@ -30,11 +30,15 @@ namespace asyncPerf {
// static declarations
uint16_t TestOptions::s_defaultEnqTxnBlkSize = 0;
uint16_t TestOptions::s_defaultDeqTxnBlkSize = 0;
+bool TestOptions::s_defaultDurable = false;
+bool TestOptions::s_defaultDestroyQueuesOnCompletion = false;
TestOptions::TestOptions(const std::string& name) :
tests::storePerftools::common::TestOptions(name),
m_enqTxnBlockSize(s_defaultEnqTxnBlkSize),
- m_deqTxnBlockSize(s_defaultDeqTxnBlkSize)
+ m_deqTxnBlockSize(s_defaultDeqTxnBlkSize),
+ m_durable(s_defaultDurable),
+ m_destroyQueuesOnCompletion(s_defaultDestroyQueuesOnCompletion)
{
doAddOptions();
}
@@ -46,10 +50,14 @@ TestOptions::TestOptions(const uint32_t numMsgs,
const uint16_t numDeqThreadsPerQueue,
const uint16_t enqTxnBlockSize,
const uint16_t deqTxnBlockSize,
+ const bool durable,
+ const bool destroyQueuesOnCompletion,
const std::string& name) :
tests::storePerftools::common::TestOptions(numMsgs, msgSize, numQueues, numEnqThreadsPerQueue, numDeqThreadsPerQueue, name),
m_enqTxnBlockSize(enqTxnBlockSize),
- m_deqTxnBlockSize(deqTxnBlockSize)
+ m_deqTxnBlockSize(deqTxnBlockSize),
+ m_durable(durable),
+ m_destroyQueuesOnCompletion(destroyQueuesOnCompletion)
{
doAddOptions();
}
@@ -63,6 +71,8 @@ TestOptions::printVals(std::ostream& os) const
tests::storePerftools::common::TestOptions::printVals(os);
os << " Num enqueus per transaction [-t, --enq-txn-size]: " << m_enqTxnBlockSize << std::endl;
os << " Num dequeues per transaction [-d, --deq-txn-size]: " << m_deqTxnBlockSize << std::endl;
+ os << " Durable [--durable]: " << (m_durable ? "true" : "false") << std::endl;
+ os << " Destroy queues on test completion [--destroy-queues]: " << (m_destroyQueuesOnCompletion ? "true" : "false") << std::endl;
}
void
@@ -73,6 +83,10 @@ TestOptions::doAddOptions()
"Num enqueus per transaction (0 = no transactions)")
("deq-txn-size,d", qpid::optValue(m_deqTxnBlockSize, "N"),
"Num dequeues per transaction (0 = no transactions)")
+ ("durable", qpid::optValue(m_durable),
+ "Queues and messages are durable")
+ ("destroy-queues", qpid::optValue(m_destroyQueuesOnCompletion),
+ "Destroy queue recoreds persistent store on test completion")
;
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.h b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.h
index 76b18717fa..b7e1c0a7a8 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.h
@@ -41,16 +41,22 @@ public:
const uint16_t numDeqThreadsPerQueue,
const uint16_t enqTxnBlockSize,
const uint16_t deqTxnBlockSize,
+ const bool durable,
+ const bool destroyQueuesOnCompletion,
const std::string& name="Test Options");
virtual ~TestOptions();
void printVals(std::ostream& os) const;
uint16_t m_enqTxnBlockSize; ///< Transaction block size for enqueues
uint16_t m_deqTxnBlockSize; ///< Transaction block size for dequeues
+ bool m_durable; ///< Use durable queues and messages for test
+ bool m_destroyQueuesOnCompletion; ///< Destroy durable queues on completion of test
protected:
static uint16_t s_defaultEnqTxnBlkSize; ///< Default transaction block size for enqueues
static uint16_t s_defaultDeqTxnBlkSize; ///< Default transaction block size for dequeues
+ static bool s_defaultDurable; ///< Default flag for using durable queues and messages for test
+ static bool s_defaultDestroyQueuesOnCompletion; ///< Default flag for destroying queues on completion of test
void doAddOptions();
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestResult.h b/cpp/src/tests/storePerftools/asyncPerf/TestResult.h
index 1b831c3e17..dc491b074d 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TestResult.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/TestResult.h
@@ -32,15 +32,13 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class TestOptions;
-
/**
- * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in the journal.
+ * \brief Results class that accepts an elapsed time to calculate the rate of message throughput in a test.
*
* This class (being subclassed from ScopedTimable) is passed to a ScopedTimer object on construction, and the
* inherited _elapsed member will be written with the calculated elapsed time (in seconds) on destruction of the
* ScopedTimer object. This time (initially set to 0.0) is used to calculate message and message byte throughput.
- * The message number and size information comes from the JrnlPerfTestParameters object passed to the constructor.
+ * The message number and size information comes from the TestOptions object passed to the constructor.
*
* Results are available through the use of toStream(), toString() or the << operators.
*
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp
index 118707b3d3..51a5791403 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.cpp
@@ -1,16 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TransactionAsyncContext.cpp
+ */
+
#include "TransactionAsyncContext.h"
+#include <cassert>
+
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-TransactionAsyncContext::TransactionAsyncContext(MockTransactionContext* tc,
+TransactionAsyncContext::TransactionAsyncContext(boost::shared_ptr<MockTransactionContext> tc,
const qpid::asyncStore::AsyncOperation::opCode op):
- qpid::broker::BrokerAsyncContext(),
m_tc(tc),
m_op(op)
{
- assert(tc != 0);
+ assert(m_tc.get() != 0);
}
TransactionAsyncContext::~TransactionAsyncContext()
@@ -28,7 +52,7 @@ TransactionAsyncContext::getOpStr() const
return qpid::asyncStore::AsyncOperation::getOpStr(m_op);
}
-MockTransactionContext*
+boost::shared_ptr<MockTransactionContext>
TransactionAsyncContext::getTransactionContext() const
{
return m_tc;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h b/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h
index 3bce23046e..2b4f189f41 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/TransactionAsyncContext.h
@@ -1,25 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * \file TransactionAsyncContext.h
+ */
+
#ifndef tests_storePerftools_asyncPerf_TransactionAsyncContext_h_
#define tests_storePerftools_asyncPerf_TransactionAsyncContext_h_
-#include "MockTransactionContext.h"
#include "qpid/asyncStore/AsyncOperation.h"
-#include "qpid/broker/BrokerAsyncContext.h"
+#include "qpid/broker/AsyncStore.h" // qpid::broker::BrokerAsyncContext
+
+#include <boost/shared_ptr.hpp>
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-class TransactionAsyncContext: public qpid::broker::BrokerAsyncContext {
+class MockTransactionContext;
+
+class TransactionAsyncContext: public qpid::broker::BrokerAsyncContext
+{
public:
- TransactionAsyncContext(MockTransactionContext* tc,
+ TransactionAsyncContext(boost::shared_ptr<MockTransactionContext> tc,
const qpid::asyncStore::AsyncOperation::opCode op);
virtual ~TransactionAsyncContext();
qpid::asyncStore::AsyncOperation::opCode getOpCode() const;
const char* getOpStr() const;
- MockTransactionContext* getTransactionContext() const;
+ boost::shared_ptr<MockTransactionContext> getTransactionContext() const;
void destroy();
+
protected:
- MockTransactionContext* m_tc;
+ boost::shared_ptr<MockTransactionContext> m_tc;
const qpid::asyncStore::AsyncOperation::opCode m_op;
};
diff --git a/cpp/src/tests/storePerftools/common/Thread.h b/cpp/src/tests/storePerftools/common/Thread.h
index bab484dd66..74d25a9da0 100644
--- a/cpp/src/tests/storePerftools/common/Thread.h
+++ b/cpp/src/tests/storePerftools/common/Thread.h
@@ -27,6 +27,8 @@
#include <pthread.h>
#include <string>
+#include <boost/shared_ptr.hpp>
+
namespace tests {
namespace storePerftools {
namespace common {