diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-06-25 19:11:55 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-06-25 19:11:55 +0000 |
commit | cbd4f9c22974db5f53b42a4326486ec8325b79cc (patch) | |
tree | 95986a4f10104ea2b9cc79c7463d0bc9ab451bcf | |
parent | b95f9427ede4a2045ac6424a6341de9185a13602 (diff) | |
download | qpid-python-cbd4f9c22974db5f53b42a4326486ec8325b79cc.tar.gz |
WIP - transactional consume path completed, still some testing to be done.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1353703 13f79535-47bb-0310-9956-ffa450edef68
11 files changed, 175 insertions, 37 deletions
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp index d6662e5001..b975f09448 100644 --- a/cpp/src/qpid/broker/TxnBuffer.cpp +++ b/cpp/src/qpid/broker/TxnBuffer.cpp @@ -111,7 +111,6 @@ TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh) if (arh) { boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); if (arh->getErrNo()) { - tac->getTxnBuffer()->asyncLocalAbort(); std::cerr << "Transaction xid=\"" << tac->getTransactionContext().getXid() << "\": Operation " << tac->getOpStr() << ": failure " << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; tac->getTxnBuffer()->asyncLocalAbort(); @@ -153,7 +152,10 @@ TxnBuffer::asyncLocalCommit() //std::cout << "TTT TxnBuffer:asyncLocalCommit: COMMIT->COMPLETE" << std::endl << std::flush; commit(); m_state = COMPLETE; - //delete this; // TODO: ugly! Find a better way to handle the life cycle of this class + delete this; // TODO: ugly! Find a better way to handle the life cycle of this class + break; +// case COMPLETE: +//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMPLETE" << std::endl << std::flush; break; default: ; //std::cout << "TTT TxnBuffer:asyncLocalCommit: Unexpected state " << m_state << std::endl << std::flush; @@ -183,7 +185,7 @@ TxnBuffer::asyncLocalAbort() //std::cout << "TTT TxnBuffer:asyncRollback: ROLLBACK->COMPLETE" << std::endl << std::flush; rollback(); m_state = COMPLETE; - //delete this; // TODO: ugly! Find a better way to handle the life cycle of this class + delete this; // TODO: ugly! Find a better way to handle the life cycle of this class default: ; //std::cout << "TTT TxnBuffer:asyncRollback: Unexpected state " << m_state << std::endl << std::flush; } diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp index 7a0224a9b5..f89ba22b2a 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp @@ -23,6 +23,7 @@ #include "DeliveryRecord.h" +#include "MessageConsumer.h" #include "SimpleMessage.h" #include "SimpleQueue.h" @@ -31,8 +32,10 @@ namespace storePerftools { namespace asyncPerf { DeliveryRecord::DeliveryRecord(const QueuedMessage& qm, + MessageConsumer& mc, bool accepted) : m_queuedMessage(qm), + m_msgConsumer(mc), m_accepted(accepted), m_ended(accepted) {} @@ -41,17 +44,29 @@ DeliveryRecord::~DeliveryRecord() {} bool -DeliveryRecord::accept(qpid::broker::TxnHandle* txn) +DeliveryRecord::accept() { if (!m_ended) { - assert(m_queuedMessage.getQueue()); - m_queuedMessage.getQueue()->dequeue(*txn, m_queuedMessage); + m_queuedMessage.getQueue()->dequeue(m_queuedMessage); m_accepted = true; setEnded(); } return isRedundant(); } +/* +bool +DeliveryRecord::accept(qpid::broker::TxnHandle& txn) +{ + if (!m_ended) { + m_queuedMessage.getQueue()->dequeue(txn, m_queuedMessage); + m_accepted = true; + setEnded(); + } + return isRedundant(); +} +*/ + bool DeliveryRecord::isAccepted() const { @@ -78,5 +93,24 @@ DeliveryRecord::isRedundant() const return m_ended; } +void +DeliveryRecord::dequeue(qpid::broker::TxnHandle& txn) +{ + m_queuedMessage.getQueue()->dequeue(txn, m_queuedMessage); +} + +void +DeliveryRecord::committed() const +{ +//std::cout << "DeliveryRecord::committed()" << std::endl << std::flush; + m_msgConsumer.dequeueComplete(); + //m_queuedMessage.getQueue()->dequeueCommitted(m_queuedMessage); +} + +QueuedMessage +DeliveryRecord::getQueuedMessage() const +{ + return m_queuedMessage; +} }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h index 25b5446a5f..ea8eba0468 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h +++ b/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h @@ -35,18 +35,26 @@ namespace tests { namespace storePerftools { namespace asyncPerf { +class MessageConsumer; + class DeliveryRecord { public: DeliveryRecord(const QueuedMessage& qm, + MessageConsumer& mc, bool accepted); virtual ~DeliveryRecord(); - bool accept(qpid::broker::TxnHandle* txn); + bool accept(); +// bool accept(qpid::broker::TxnHandle& txn); bool isAccepted() const; bool setEnded(); bool isEnded() const; bool isRedundant() const; + void dequeue(qpid::broker::TxnHandle& txn); + void committed() const; + QueuedMessage getQueuedMessage() const; private: QueuedMessage m_queuedMessage; + MessageConsumer& m_msgConsumer; bool m_accepted : 1; bool m_ended : 1; }; diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp index 1859bde947..3ac6867ce1 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp @@ -23,8 +23,10 @@ #include "MessageConsumer.h" +#include "DeliveryRecord.h" #include "SimpleQueue.h" #include "TestOptions.h" +#include "TxnAccept.h" #include "qpid/asyncStore/AsyncStoreImpl.h" #include "qpid/broker/TxnBuffer.h" @@ -48,26 +50,65 @@ MessageConsumer::MessageConsumer(const TestOptions& perfTestParams, MessageConsumer::~MessageConsumer() {} +void +MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) +{ + // TODO: May need a lock? + m_unacked.push_back(dr); +} + +void +MessageConsumer::dequeueComplete() +{ +//std::cout << "MessageConsumer::dequeueComplete()" << std::endl << std::flush; + // TODO: May need a lock + //++m_numMsgs; +} + void* MessageConsumer::runConsumers() { const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U; - uint16_t txnCnt = 0U; + uint16_t opsInTxnCnt = 0U; qpid::broker::TxnBuffer* tb = 0; if (useTxns) { tb = new qpid::broker::TxnBuffer(m_resultQueue); } - uint32_t numMsgs = 0; + uint32_t numMsgs = 0UL; while (numMsgs < m_perfTestParams.m_numMsgs) { - if (m_queue->dispatch()) { + if (m_queue->dispatch(*this)) { ++numMsgs; + if (useTxns) { + // --- Transactional dequeue --- + if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) { + if (m_perfTestParams.m_durable) { + boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked)); + m_unacked.clear(); + tb->enlist(ta); + tb->commitLocal(m_store); + if (numMsgs < m_perfTestParams.m_numMsgs) { + tb = new qpid::broker::TxnBuffer(m_resultQueue); + } + } else { + tb->commit(); + } + opsInTxnCnt = 0U; + } + } else { + // --- Non-transactional dequeue --- + for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_unacked.begin(); i != m_unacked.end(); ++i) { + (*i)->accept(); + } + m_unacked.clear(); + //++numMsgs; + } } else { ::usleep(1000); // TODO - replace this poller with condition variable } } - if (txnCnt) { + if (opsInTxnCnt) { if (m_perfTestParams.m_durable) { tb->commitLocal(m_store); } else { diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h index 5404fe9f58..e733990cf7 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.h @@ -25,6 +25,7 @@ #define tests_storePerftools_asyncPerf_MessageConsumer_h_ #include "boost/shared_ptr.hpp" +#include <deque> namespace qpid { namespace asyncStore { @@ -38,6 +39,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { +class DeliveryRecord; class SimpleQueue; class TestOptions; @@ -49,6 +51,8 @@ public: qpid::broker::AsyncResultQueue& arq, boost::shared_ptr<SimpleQueue> queue); virtual ~MessageConsumer(); + void record(boost::shared_ptr<DeliveryRecord> dr); + void dequeueComplete(); void* runConsumers(); static void* startConsumers(void* ptr); @@ -57,6 +61,7 @@ private: qpid::asyncStore::AsyncStoreImpl* m_store; qpid::broker::AsyncResultQueue& m_resultQueue; boost::shared_ptr<SimpleQueue> m_queue; + std::deque<boost::shared_ptr<DeliveryRecord> > m_unacked; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp index 0cab537fb0..7d9aaceb11 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp @@ -56,7 +56,7 @@ void* MessageProducer::runProducers() { const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U; - uint16_t txnCnt = 0U; + uint16_t recsInTxnCnt = 0U; qpid::broker::TxnBuffer* tb = 0; if (useTxns) { tb = new qpid::broker::TxnBuffer(m_resultQueue); @@ -67,27 +67,26 @@ MessageProducer::runProducers() boost::shared_ptr<TxnPublish> op(new TxnPublish(msg)); op->deliverTo(m_queue); tb->enlist(op); - if (++txnCnt >= m_perfTestParams.m_enqTxnBlockSize) { + if (++recsInTxnCnt >= m_perfTestParams.m_enqTxnBlockSize) { if (m_perfTestParams.m_durable) { tb->commitLocal(m_store); // TxnBuffer instance tb carries async state that precludes it being re-used for the next // transaction until the current commit cycle completes. So use another instance. This // instance should auto-delete when the async commit cycle completes. - if (numMsgs<m_perfTestParams.m_numMsgs) { - //tb = boost::shared_ptr<qpid::broker::TxnBuffer>(new qpid::broker::TxnBuffer(m_resultQueue)); + if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) { tb = new qpid::broker::TxnBuffer(m_resultQueue); } } else { tb->commit(); } - txnCnt = 0U; + recsInTxnCnt = 0U; } } else { m_queue->deliver(msg); } } - if (txnCnt) { + if (recsInTxnCnt) { if (m_perfTestParams.m_durable) { tb->commitLocal(m_store); } else { diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp index d8b312f011..8bb79367ed 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp @@ -23,6 +23,8 @@ #include "SimpleQueue.h" +#include "DeliveryRecord.h" +#include "MessageConsumer.h" #include "MessageDeque.h" #include "SimpleMessage.h" #include "QueueAsyncContext.h" @@ -63,12 +65,7 @@ SimpleQueue::SimpleQueue(const std::string& name, } SimpleQueue::~SimpleQueue() -{ -// m_store->flush(*this); - // TODO: Make destroying the store a test parameter -// m_store->destroy(*this); -// m_store = 0; -} +{} // static void @@ -170,16 +167,24 @@ SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) } bool -SimpleQueue::dispatch() +SimpleQueue::dispatch(MessageConsumer& mc) { QueuedMessage qm; if (m_messages->consume(qm)) { - return dequeue(s_nullTxnHandle, qm); + boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false)); + mc.record(dr); + return true; } return false; } bool +SimpleQueue::enqueue(QueuedMessage& qm) +{ + return enqueue(s_nullTxnHandle, qm); +} + +bool SimpleQueue::enqueue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { @@ -195,6 +200,12 @@ SimpleQueue::enqueue(qpid::broker::TxnHandle& th, } bool +SimpleQueue::dequeue(QueuedMessage& qm) +{ + return dequeue(s_nullTxnHandle, qm); +} + +bool SimpleQueue::dequeue(qpid::broker::TxnHandle& th, QueuedMessage& qm) { diff --git a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h index bc9dda0d98..59e12b5c93 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h +++ b/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h @@ -50,6 +50,7 @@ namespace tests { namespace storePerftools { namespace asyncPerf { +class MessageConsumer; class Messages; class SimpleMessage; class QueueAsyncContext; @@ -76,9 +77,11 @@ public: // --- Methods in msg handling path from qpid::Queue --- void deliver(boost::intrusive_ptr<SimpleMessage> msg); - bool dispatch(); // similar to qpid::broker::Queue::distpatch(Consumer&) but without Consumer param + bool dispatch(MessageConsumer& mc); + bool enqueue(QueuedMessage& qm); bool enqueue(qpid::broker::TxnHandle& th, QueuedMessage& qm); + bool dequeue(QueuedMessage& qm); bool dequeue(qpid::broker::TxnHandle& th, QueuedMessage& qm); void process(boost::intrusive_ptr<SimpleMessage> msg); diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp index c1d35805a6..7e737ed21a 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp @@ -23,11 +23,14 @@ #include "TxnAccept.h" +#include "DeliveryRecord.h" + namespace tests { namespace storePerftools { namespace asyncPerf { -TxnAccept::TxnAccept() +TxnAccept::TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops) : + m_ops(ops.begin(), ops.end()) {} TxnAccept::~TxnAccept() @@ -36,17 +39,42 @@ TxnAccept::~TxnAccept() // --- Interface TxnOp --- bool -TxnAccept::prepare(qpid::broker::TxnHandle& /*th*/) throw() +TxnAccept::prepare(qpid::broker::TxnHandle& th) throw() { +//std::cout << "TTT TxnAccept::prepare" << std::endl << std::flush; + try { + for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + (*i)->dequeue(th); + } + } catch (const std::exception& e) { + std::cerr << "TxnAccept: Failed to prepare transaction: " << e.what() << std::endl; + } catch (...) { + std::cerr << "TxnAccept: Failed to prepare transaction: (unknown error)" << std::endl; + } return false; } void TxnAccept::commit() throw() -{} +{ +//std::cout << "TTT TxnAccept::commit" << std::endl << std::flush; + try { + for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) { + (*i)->committed(); + (*i)->setEnded(); + } + //m_ops.clear(); + } catch (const std::exception& e) { + std::cerr << "TxnAccept: Failed to commit transaction: " << e.what() << std::endl; + } catch(...) { + std::cerr << "TxnAccept: Failed to commit transaction: (unknown error)" << std::endl; + } +} void TxnAccept::rollback() throw() -{} +{ +//std::cout << "TTT TxnAccept::rollback" << std::endl << std::flush; +} }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h index f164a4c965..6bc7ff9ccb 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h @@ -26,19 +26,26 @@ #include "qpid/broker/TxnOp.h" +#include "boost/shared_ptr.hpp" +#include <deque> + namespace tests { namespace storePerftools { namespace asyncPerf { +class DeliveryRecord; + class TxnAccept: public qpid::broker::TxnOp { public: - TxnAccept(); + TxnAccept(std::deque<boost::shared_ptr<DeliveryRecord> >& ops); virtual ~TxnAccept(); // --- Interface TxnOp --- bool prepare(qpid::broker::TxnHandle& th) throw(); void commit() throw(); void rollback() throw(); +private: + std::deque<boost::shared_ptr<DeliveryRecord> > m_ops; }; }}} // namespace tests::storePerftools::asyncPerf diff --git a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp index 2927dc60e2..10e48bef82 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp @@ -52,9 +52,9 @@ TxnPublish::prepare(qpid::broker::TxnHandle& th) throw() } return true; } catch (const std::exception& e) { - std::cerr << "Failed to prepare transaction: " << e.what() << std::endl; + std::cerr << "TxnPublish: Failed to prepare transaction: " << e.what() << std::endl; } catch (...) { - std::cerr << "Failed to prepare transaction: (unknown error)" << std::endl; + std::cerr << "TxnPublish: Failed to prepare transaction: (unknown error)" << std::endl; } return false; } @@ -68,9 +68,9 @@ TxnPublish::commit() throw() (*i)->commitEnqueue(); } } catch (const std::exception& e) { - std::cerr << "Failed to commit transaction: " << e.what() << std::endl; + std::cerr << "TxnPublish: Failed to commit transaction: " << e.what() << std::endl; } catch (...) { - std::cerr << "Failed to commit transaction: (unknown error)" << std::endl; + std::cerr << "TxnPublish: Failed to commit transaction: (unknown error)" << std::endl; } } @@ -83,9 +83,9 @@ TxnPublish::rollback() throw() (*i)->abortEnqueue(); } } catch (const std::exception& e) { - std::cerr << "Failed to rollback transaction: " << e.what() << std::endl; + std::cerr << "TxnPublish: Failed to rollback transaction: " << e.what() << std::endl; } catch (...) { - std::cerr << "Failed to rollback transaction: (unknown error)" << std::endl; + std::cerr << "TxnPublish: Failed to rollback transaction: (unknown error)" << std::endl; } } |