summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-06-07 12:42:37 +0000
committerKim van der Riet <kpvdr@apache.org>2012-06-07 12:42:37 +0000
commit22d453646b4815752134ad62e0b27841a103afb2 (patch)
tree152b6447a5c097b9617c10b7309775fc7987f996 /cpp/src/tests
parent45d67efe63abecddf5ca7a68c45f308664bd1466 (diff)
downloadqpid-python-22d453646b4815752134ad62e0b27841a103afb2.tar.gz
QPID-3858: WIP - added AsyncResultQueue for async result return path
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1347588 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp17
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h12
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h4
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp6
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/PerfTest.h2
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp3
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp2
9 files changed, 34 insertions, 16 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp
index e7cab4d621..fc04bc746e 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableMessage.cpp
@@ -34,7 +34,7 @@ MockPersistableMessage::MockPersistableMessage(const char* msgData,
qpid::asyncStore::AsyncStoreImpl* store) :
m_persistenceId(0ULL),
m_msg(msgData, static_cast<size_t>(msgSize)),
- m_msgHandle(store ? store->createMessageHandle(this) : store->createMessageHandle(0))
+ m_msgHandle(store ? store->createMessageHandle(this) : qpid::broker::MessageHandle(0))
{}
MockPersistableMessage::~MockPersistableMessage()
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
index 009f54a157..49d656aee4 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.cpp
@@ -30,6 +30,7 @@
#include "QueuedMessage.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/broker/AsyncResultQueue.h"
namespace tests {
namespace storePerftools {
@@ -37,10 +38,12 @@ namespace asyncPerf {
MockPersistableQueue::MockPersistableQueue(const std::string& name,
const qpid::framing::FieldTable& /*args*/,
- qpid::asyncStore::AsyncStoreImpl* store) :
+ qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncResultQueue& resultQueue) :
qpid::broker::PersistableQueue(),
m_name(name),
m_store(store),
+ m_resultQueue(resultQueue),
m_asyncOpCounter(0UL),
m_persistenceId(0ULL),
m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this.
@@ -64,6 +67,7 @@ MockPersistableQueue::~MockPersistableQueue()
}
// static
+/*
void
MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
qpid::broker::BrokerAsyncContext* bc)
@@ -102,6 +106,7 @@ MockPersistableQueue::handleAsyncResult(const qpid::broker::AsyncResult* res,
if (bc) delete bc;
if (res) delete res;
}
+*/
const qpid::broker::QueueHandle&
MockPersistableQueue::getHandle() const
@@ -124,10 +129,12 @@ MockPersistableQueue::getStore()
void
MockPersistableQueue::asyncCreate()
{
+ qpid::broker::ResultCallback rcb = &qpid::broker::AsyncResultQueue::submit;
if (m_store) {
m_store->submitCreate(m_queueHandle,
this,
- &handleAsyncResult,
+ rcb,
+// &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/,
new QueueAsyncContext(shared_from_this(),
qpid::asyncStore::AsyncOperation::QUEUE_CREATE));
++m_asyncOpCounter;
@@ -141,7 +148,7 @@ MockPersistableQueue::asyncDestroy(const bool deleteQueue)
if (m_store) {
if (deleteQueue) {
m_store->submitDestroy(m_queueHandle,
- &handleAsyncResult,
+ &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/,
new QueueAsyncContext(shared_from_this(),
qpid::asyncStore::AsyncOperation::QUEUE_DESTROY));
++m_asyncOpCounter;
@@ -329,7 +336,7 @@ MockPersistableQueue::asyncEnqueue(MockTransactionContext* txn,
//std::cout << "QQQ Queue=\"" << m_name << "\": asyncEnqueue() rid=0x" << std::hex << qm.payload()->getPersistenceId() << std::dec << std::endl << std::flush;
m_store->submitEnqueue(/*enqHandle*/qm.enqHandle(),
txn->getHandle(),
- &handleAsyncResult,
+ &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/,
new QueueAsyncContext(shared_from_this(),
qm.payload(),
qpid::asyncStore::AsyncOperation::MSG_ENQUEUE));
@@ -346,7 +353,7 @@ MockPersistableQueue::asyncDequeue(MockTransactionContext* txn,
qpid::broker::EnqueueHandle enqHandle = qm.enqHandle();
m_store->submitDequeue(enqHandle,
txn->getHandle(),
- &handleAsyncResult,
+ &qpid::broker::AsyncResultQueue::submit/*&m_resultQueue.submit*/,
new QueueAsyncContext(shared_from_this(),
qm.payload(),
qpid::asyncStore::AsyncOperation::MSG_DEQUEUE));
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h
index ff6db93542..e62aeec420 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockPersistableQueue.h
@@ -38,7 +38,9 @@ namespace qpid {
namespace asyncStore {
class AsyncStoreImpl;
}
-
+namespace broker {
+class AsyncResultQueue;
+}
namespace framing {
class FieldTable;
}}
@@ -61,11 +63,12 @@ class MockPersistableQueue : public boost::enable_shared_from_this<MockPersistab
public:
MockPersistableQueue(const std::string& name,
const qpid::framing::FieldTable& args,
- qpid::asyncStore::AsyncStoreImpl* store);
+ qpid::asyncStore::AsyncStoreImpl* store,
+ qpid::broker::AsyncResultQueue& rq);
virtual ~MockPersistableQueue();
- static void handleAsyncResult(const qpid::broker::AsyncResult* res,
- qpid::broker::BrokerAsyncContext* bc);
+// static void handleAsyncResult(const qpid::broker::AsyncResult* res,
+// qpid::broker::BrokerAsyncContext* bc);
const qpid::broker::QueueHandle& getHandle() const;
qpid::broker::QueueHandle& getHandle();
qpid::asyncStore::AsyncStoreImpl* getStore();
@@ -99,6 +102,7 @@ public:
protected:
const std::string m_name;
qpid::asyncStore::AsyncStoreImpl* m_store;
+ qpid::broker::AsyncResultQueue& m_resultQueue;
AsyncOpCounter m_asyncOpCounter;
mutable uint64_t m_persistenceId;
std::string m_persistableData;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp
index c444f596e5..0ac0c7732f 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp
@@ -64,6 +64,7 @@ MockTransactionContext::~MockTransactionContext()
{}
// static
+/*
void
MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res,
qpid::broker::BrokerAsyncContext* bc)
@@ -96,6 +97,7 @@ MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res,
if (bc) delete bc;
if (res) delete res;
}
+*/
const qpid::broker::TxnHandle&
MockTransactionContext::getHandle() const
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h
index 3f70b0bfda..d727caede6 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.h
@@ -53,8 +53,8 @@ public:
MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store,
const std::string& xid = std::string());
virtual ~MockTransactionContext();
- static void handleAsyncResult(const qpid::broker::AsyncResult* res,
- qpid::broker::BrokerAsyncContext* bc);
+// static void handleAsyncResult(const qpid::broker::AsyncResult* res,
+// qpid::broker::BrokerAsyncContext* bc);
const qpid::broker::TxnHandle& getHandle() const;
qpid::broker::TxnHandle& getHandle();
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
index 184a899570..66e0bb3dbf 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
@@ -32,6 +32,7 @@
#include "tests/storePerftools/common/Thread.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
+#include "qpid/broker/AsyncResultQueue.h"
#include "qpid/sys/Poller.h"
#include <iomanip>
@@ -48,6 +49,7 @@ PerfTest::PerfTest(const TestOptions& to,
m_msgData(new char[to.m_msgSize]),
m_poller(new qpid::sys::Poller),
m_pollingThread(m_poller.get()),
+ m_resultQueue(m_poller),
m_store(0)
{
std::memset((void*)m_msgData, 0, (size_t)to.m_msgSize);
@@ -68,7 +70,7 @@ PerfTest::~PerfTest()
void
PerfTest::prepareStore()
{
- m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts);
+ m_store = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts, &m_resultQueue);
m_store->initialize();
}
@@ -86,7 +88,7 @@ PerfTest::prepareQueues()
for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) {
std::ostringstream qname;
qname << "queue_" << std::setw(4) << std::setfill('0') << i;
- boost::shared_ptr<MockPersistableQueue> mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store));
+ boost::shared_ptr<MockPersistableQueue> mpq(new MockPersistableQueue(qname.str(), m_queueArgs, m_store, m_resultQueue));
mpq->asyncCreate();
m_queueList.push_back(mpq);
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
index 3bd3f6bd32..46455e4af0 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/PerfTest.h
@@ -28,6 +28,7 @@
#include "tests/storePerftools/common/Streamable.h"
+#include "qpid/broker/AsyncResultQueue.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Thread.h"
@@ -69,6 +70,7 @@ protected:
const char* m_msgData;
boost::shared_ptr<qpid::sys::Poller> m_poller;
qpid::sys::Thread m_pollingThread;
+ qpid::broker::AsyncResultQueue m_resultQueue;
qpid::asyncStore::AsyncStoreImpl* m_store;
std::deque<boost::shared_ptr<MockPersistableQueue> > m_queueList;
std::deque<boost::shared_ptr<MessageProducer> > m_producers;
diff --git a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
index 7903d6551a..802279bbf9 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
@@ -27,6 +27,7 @@
#include "MockPersistableQueue.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
+//#include "qpid/broker/EnqueueHandle.h"
namespace tests {
namespace storePerftools {
@@ -40,7 +41,7 @@ QueuedMessage::QueuedMessage(MockPersistableQueue* q,
boost::shared_ptr<MockPersistableMessage> msg) :
m_queue(q),
m_msg(msg),
- m_enqHandle(q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()))
+ m_enqHandle(q->getStore() ? q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()) : qpid::broker::EnqueueHandle(0))
{}
QueuedMessage::QueuedMessage(const QueuedMessage& qm) :
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
index 2f4461e8b5..dccfc4fcbf 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TestOptions.cpp
@@ -86,7 +86,7 @@ TestOptions::doAddOptions()
("durable", qpid::optValue(m_durable),
"Queues and messages are durable")
("destroy-queues", qpid::optValue(m_destroyQueuesOnCompletion),
- "Destroy queue recoreds persistent store on test completion")
+ "Destroy queues in persistent store on test completion")
;
}