summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-06-25 19:11:55 +0000
committerKim van der Riet <kpvdr@apache.org>2012-06-25 19:11:55 +0000
commitcbd4f9c22974db5f53b42a4326486ec8325b79cc (patch)
tree95986a4f10104ea2b9cc79c7463d0bc9ab451bcf
parentb95f9427ede4a2045ac6424a6341de9185a13602 (diff)
downloadqpid-python-cbd4f9c22974db5f53b42a4326486ec8325b79cc.tar.gz
WIP - transactional consume path completed, still some testing to be done.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1353703 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.cpp8
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp40
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h10
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp49
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h5
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp11
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp27
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h5
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp36
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h9
-rw-r--r--cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp12
11 files changed, 175 insertions, 37 deletions
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp
index d6662e5001..b975f09448 100644
--- a/cpp/src/qpid/broker/TxnBuffer.cpp
+++ b/cpp/src/qpid/broker/TxnBuffer.cpp
@@ -111,7 +111,6 @@ TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh)
if (arh) {
boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
if (arh->getErrNo()) {
- tac->getTxnBuffer()->asyncLocalAbort();
std::cerr << "Transaction xid=\"" << tac->getTransactionContext().getXid() << "\": Operation " << tac->getOpStr() << ": failure "
<< arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
tac->getTxnBuffer()->asyncLocalAbort();
@@ -153,7 +152,10 @@ TxnBuffer::asyncLocalCommit()
//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMMIT->COMPLETE" << std::endl << std::flush;
commit();
m_state = COMPLETE;
- //delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
+ delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
+ break;
+// case COMPLETE:
+//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMPLETE" << std::endl << std::flush;
break;
default: ;
//std::cout << "TTT TxnBuffer:asyncLocalCommit: Unexpected state " << m_state << std::endl << std::flush;
@@ -183,7 +185,7 @@ TxnBuffer::asyncLocalAbort()
//std::cout << "TTT TxnBuffer:asyncRollback: ROLLBACK->COMPLETE" << std::endl << std::flush;
rollback();
m_state = COMPLETE;
- //delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
+ delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
default: ;
//std::cout << "TTT TxnBuffer:asyncRollback: Unexpected state " << m_state << std::endl << std::flush;
}
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
index 7a0224a9b5..f89ba22b2a 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
@@ -23,6 +23,7 @@
#include "DeliveryRecord.h"
+#include "MessageConsumer.h"
#include "SimpleMessage.h"
#include "SimpleQueue.h"
@@ -31,8 +32,10 @@ namespace storePerftools {
namespace asyncPerf {
DeliveryRecord::DeliveryRecord(const QueuedMessage& qm,
+ MessageConsumer& mc,
bool accepted) :
m_queuedMessage(qm),
+ m_msgConsumer(mc),
m_accepted(accepted),
m_ended(accepted)
{}
@@ -41,17 +44,29 @@ DeliveryRecord::~DeliveryRecord()
{}
bool
-DeliveryRecord::accept(qpid::broker::TxnHandle* txn)
+DeliveryRecord::accept()
{
if (!m_ended) {
- assert(m_queuedMessage.getQueue());
- m_queuedMessage.getQueue()->dequeue(*txn, m_queuedMessage);
+ m_queuedMessage.getQueue()->dequeue(m_queuedMessage);
m_accepted = true;
setEnded();
}
return isRedundant();
}
+/*
+bool
+DeliveryRecord::accept(qpid::broker::TxnHandle& txn)
+{
+ if (!m_ended) {
+ m_queuedMessage.getQueue()->dequeue(txn, m_queuedMessage);
+ m_accepted = true;
+ setEnded();
+ }
+ return isRedundant();
+}
+*/
+
bool
DeliveryRecord::isAccepted() const
{
@@ -78,5 +93,24 @@ DeliveryRecord::isRedundant() const
return m_ended;
}
+void
+DeliveryRecord::dequeue(qpid::broker::TxnHandle& txn)
+{
+ m_queuedMessage.getQueue()->dequeue(txn, m_queuedMessage);
+}
+
+void
+DeliveryRecord::committed() const
+{
+//std::cout << "DeliveryRecord::committed()" << std::endl << std::flush;
+ m_msgConsumer.dequeueComplete();
+ //m_queuedMessage.getQueue()->dequeueCommitted(m_queuedMessage);
+}
+
+QueuedMessage
+DeliveryRecord::getQueuedMessage() const
+{
+ return m_queuedMessage;
+}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
index 25b5446a5f..ea8eba0468 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
@@ -35,18 +35,26 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
+class MessageConsumer;
+
class DeliveryRecord {
public:
DeliveryRecord(const QueuedMessage& qm,
+ MessageConsumer& mc,
bool accepted);
virtual ~DeliveryRecord();
- bool accept(qpid::broker::TxnHandle* txn);
+ bool accept();
+// bool accept(qpid::broker::TxnHandle& txn);
bool isAccepted() const;
bool setEnded();
bool isEnded() const;
bool isRedundant() const;
+ void dequeue(qpid::broker::TxnHandle& txn);
+ void committed() const;
+ QueuedMessage getQueuedMessage() const;
private:
QueuedMessage m_queuedMessage;
+ MessageConsumer& m_msgConsumer;
bool m_accepted : 1;
bool m_ended : 1;
};
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
index 1859bde947..3ac6867ce1 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
@@ -23,8 +23,10 @@
#include "MessageConsumer.h"
+#include "DeliveryRecord.h"
#include "SimpleQueue.h"
#include "TestOptions.h"
+#include "TxnAccept.h"
#include "qpid/asyncStore/AsyncStoreImpl.h"
#include "qpid/broker/TxnBuffer.h"
@@ -48,26 +50,65 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams,
MessageConsumer::~MessageConsumer()
{}
+void
+MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr)
+{
+ // TODO: May need a lock?
+ m_unacked.push_back(dr);
+}
+
+void
+MessageConsumer::dequeueComplete()
+{
+//std::cout << "MessageConsumer::dequeueComplete()" << std::endl << std::flush;
+ // TODO: May need a lock
+ //++m_numMsgs;
+}
+
void*
MessageConsumer::runConsumers()
{
const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
- uint16_t txnCnt = 0U;
+ uint16_t opsInTxnCnt = 0U;
qpid::broker::TxnBuffer* tb = 0;
if (useTxns) {
tb = new qpid::broker::TxnBuffer(m_resultQueue);
}
- uint32_t numMsgs = 0;
+ uint32_t numMsgs = 0UL;
while (numMsgs < m_perfTestParams.m_numMsgs) {
- if (m_queue->dispatch()) {
+ if (m_queue->dispatch(*this)) {
++numMsgs;
+ if (useTxns) {
+ // --- Transactional dequeue ---
+ if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) {
+ if (m_perfTestParams.m_durable) {
+ boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
+ m_unacked.clear();
+ tb->enlist(ta);
+ tb->commitLocal(m_store);
+ if (numMsgs < m_perfTestParams.m_numMsgs) {
+ tb = new qpid::broker::TxnBuffer(m_resultQueue);
+ }
+ } else {
+ tb->commit();
+ }
+ opsInTxnCnt = 0U;
+ }
+ } else {
+ // --- Non-transactional dequeue ---
+ for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) {
+ (*i)->accept();
+ }
+ m_unacked.clear();
+ //++numMsgs;
+ }
} else {
::usleep(1000); // TODO - replace this poller with condition variable
}
}
- if (txnCnt) {
+ if (opsInTxnCnt) {
if (m_perfTestParams.m_durable) {
tb->commitLocal(m_store);
} else {
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
index 5404fe9f58..e733990cf7 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h
@@ -25,6 +25,7 @@
#define tests_storePerftools_asyncPerf_MessageConsumer_h_
#include "boost/shared_ptr.hpp"
+#include <deque>
namespace qpid {
namespace asyncStore {
@@ -38,6 +39,7 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
+class DeliveryRecord;
class SimpleQueue;
class TestOptions;
@@ -49,6 +51,8 @@ public:
qpid::broker::AsyncResultQueue& arq,
boost::shared_ptr<SimpleQueue> queue);
virtual ~MessageConsumer();
+ void record(boost::shared_ptr<DeliveryRecord> dr);
+ void dequeueComplete();
void* runConsumers();
static void* startConsumers(void* ptr);
@@ -57,6 +61,7 @@ private:
qpid::asyncStore::AsyncStoreImpl* m_store;
qpid::broker::AsyncResultQueue& m_resultQueue;
boost::shared_ptr<SimpleQueue> m_queue;
+ std::deque<boost::shared_ptr<DeliveryRecord> > m_unacked;
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
index 0cab537fb0..7d9aaceb11 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
@@ -56,7 +56,7 @@ void*
MessageProducer::runProducers()
{
const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U;
- uint16_t txnCnt = 0U;
+ uint16_t recsInTxnCnt = 0U;
qpid::broker::TxnBuffer* tb = 0;
if (useTxns) {
tb = new qpid::broker::TxnBuffer(m_resultQueue);
@@ -67,27 +67,26 @@ MessageProducer::runProducers()
boost::shared_ptr<TxnPublish> op(new TxnPublish(msg));
op->deliverTo(m_queue);
tb->enlist(op);
- if (++txnCnt >= m_perfTestParams.m_enqTxnBlockSize) {
+ if (++recsInTxnCnt >= m_perfTestParams.m_enqTxnBlockSize) {
if (m_perfTestParams.m_durable) {
tb->commitLocal(m_store);
// TxnBuffer instance tb carries async state that precludes it being re-used for the next
// transaction until the current commit cycle completes. So use another instance. This
// instance should auto-delete when the async commit cycle completes.
- if (numMsgs<m_perfTestParams.m_numMsgs) {
- //tb = boost::shared_ptr<qpid::broker::TxnBuffer>(new qpid::broker::TxnBuffer(m_resultQueue));
+ if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) {
tb = new qpid::broker::TxnBuffer(m_resultQueue);
}
} else {
tb->commit();
}
- txnCnt = 0U;
+ recsInTxnCnt = 0U;
}
} else {
m_queue->deliver(msg);
}
}
- if (txnCnt) {
+ if (recsInTxnCnt) {
if (m_perfTestParams.m_durable) {
tb->commitLocal(m_store);
} else {
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
index d8b312f011..8bb79367ed 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
@@ -23,6 +23,8 @@
#include "SimpleQueue.h"
+#include "DeliveryRecord.h"
+#include "MessageConsumer.h"
#include "MessageDeque.h"
#include "SimpleMessage.h"
#include "QueueAsyncContext.h"
@@ -63,12 +65,7 @@ SimpleQueue::SimpleQueue(const std::string& name,
}
SimpleQueue::~SimpleQueue()
-{
-// m_store->flush(*this);
- // TODO: Make destroying the store a test parameter
-// m_store->destroy(*this);
-// m_store = 0;
-}
+{}
// static
void
@@ -170,16 +167,24 @@ SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
}
bool
-SimpleQueue::dispatch()
+SimpleQueue::dispatch(MessageConsumer& mc)
{
QueuedMessage qm;
if (m_messages->consume(qm)) {
- return dequeue(s_nullTxnHandle, qm);
+ boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false));
+ mc.record(dr);
+ return true;
}
return false;
}
bool
+SimpleQueue::enqueue(QueuedMessage& qm)
+{
+ return enqueue(s_nullTxnHandle, qm);
+}
+
+bool
SimpleQueue::enqueue(qpid::broker::TxnHandle& th,
QueuedMessage& qm)
{
@@ -195,6 +200,12 @@ SimpleQueue::enqueue(qpid::broker::TxnHandle& th,
}
bool
+SimpleQueue::dequeue(QueuedMessage& qm)
+{
+ return dequeue(s_nullTxnHandle, qm);
+}
+
+bool
SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
QueuedMessage& qm)
{
diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
index bc9dda0d98..59e12b5c93 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
@@ -50,6 +50,7 @@ namespace tests {
namespace storePerftools {
namespace asyncPerf {
+class MessageConsumer;
class Messages;
class SimpleMessage;
class QueueAsyncContext;
@@ -76,9 +77,11 @@ public:
// --- Methods in msg handling path from qpid::Queue ---
void deliver(boost::intrusive_ptr<SimpleMessage> msg);
- bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&) but without Consumer param
+ bool dispatch(MessageConsumer& mc);
+ bool enqueue(QueuedMessage& qm);
bool enqueue(qpid::broker::TxnHandle& th,
QueuedMessage& qm);
+ bool dequeue(QueuedMessage& qm);
bool dequeue(qpid::broker::TxnHandle& th,
QueuedMessage& qm);
void process(boost::intrusive_ptr<SimpleMessage> msg);
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
index c1d35805a6..7e737ed21a 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
@@ -23,11 +23,14 @@
#include "TxnAccept.h"
+#include "DeliveryRecord.h"
+
namespace tests {
namespace storePerftools {
namespace asyncPerf {
-TxnAccept::TxnAccept()
+TxnAccept::TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops) :
+ m_ops(ops.begin(), ops.end())
{}
TxnAccept::~TxnAccept()
@@ -36,17 +39,42 @@ TxnAccept::~TxnAccept()
// --- Interface TxnOp ---
bool
-TxnAccept::prepare(qpid::broker::TxnHandle& /*th*/) throw()
+TxnAccept::prepare(qpid::broker::TxnHandle& th) throw()
{
+//std::cout << "TTT TxnAccept::prepare" << std::endl << std::flush;
+ try {
+ for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
+ (*i)->dequeue(th);
+ }
+ } catch (const std::exception& e) {
+ std::cerr << "TxnAccept: Failed to prepare transaction: " << e.what() << std::endl;
+ } catch (...) {
+ std::cerr << "TxnAccept: Failed to prepare transaction: (unknown error)" << std::endl;
+ }
return false;
}
void
TxnAccept::commit() throw()
-{}
+{
+//std::cout << "TTT TxnAccept::commit" << std::endl << std::flush;
+ try {
+ for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) {
+ (*i)->committed();
+ (*i)->setEnded();
+ }
+ //m_ops.clear();
+ } catch (const std::exception& e) {
+ std::cerr << "TxnAccept: Failed to commit transaction: " << e.what() << std::endl;
+ } catch(...) {
+ std::cerr << "TxnAccept: Failed to commit transaction: (unknown error)" << std::endl;
+ }
+}
void
TxnAccept::rollback() throw()
-{}
+{
+//std::cout << "TTT TxnAccept::rollback" << std::endl << std::flush;
+}
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
index f164a4c965..6bc7ff9ccb 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
@@ -26,19 +26,26 @@
#include "qpid/broker/TxnOp.h"
+#include "boost/shared_ptr.hpp"
+#include <deque>
+
namespace tests {
namespace storePerftools {
namespace asyncPerf {
+class DeliveryRecord;
+
class TxnAccept: public qpid::broker::TxnOp {
public:
- TxnAccept();
+ TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops);
virtual ~TxnAccept();
// --- Interface TxnOp ---
bool prepare(qpid::broker::TxnHandle& th) throw();
void commit() throw();
void rollback() throw();
+private:
+ std::deque<boost::shared_ptr<DeliveryRecord> > m_ops;
};
}}} // namespace tests::storePerftools::asyncPerf
diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
index 2927dc60e2..10e48bef82 100644
--- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
+++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
@@ -52,9 +52,9 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
}
return true;
} catch (const std::exception& e) {
- std::cerr << "Failed to prepare transaction: " << e.what() << std::endl;
+ std::cerr << "TxnPublish: Failed to prepare transaction: " << e.what() << std::endl;
} catch (...) {
- std::cerr << "Failed to prepare transaction: (unknown error)" << std::endl;
+ std::cerr << "TxnPublish: Failed to prepare transaction: (unknown error)" << std::endl;
}
return false;
}
@@ -68,9 +68,9 @@ TxnPublish::commit() throw()
(*i)->commitEnqueue();
}
} catch (const std::exception& e) {
- std::cerr << "Failed to commit transaction: " << e.what() << std::endl;
+ std::cerr << "TxnPublish: Failed to commit transaction: " << e.what() << std::endl;
} catch (...) {
- std::cerr << "Failed to commit transaction: (unknown error)" << std::endl;
+ std::cerr << "TxnPublish: Failed to commit transaction: (unknown error)" << std::endl;
}
}
@@ -83,9 +83,9 @@ TxnPublish::rollback() throw()
(*i)->abortEnqueue();
}
} catch (const std::exception& e) {
- std::cerr << "Failed to rollback transaction: " << e.what() << std::endl;
+ std::cerr << "TxnPublish: Failed to rollback transaction: " << e.what() << std::endl;
} catch (...) {
- std::cerr << "Failed to rollback transaction: (unknown error)" << std::endl;
+ std::cerr << "TxnPublish: Failed to rollback transaction: (unknown error)" << std::endl;
}
}