diff options
Diffstat (limited to 'cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp')
-rw-r--r-- | cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp | 88 |
1 files changed, 65 insertions, 23 deletions
diff --git a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp index 05f06e95a1..c444f596e5 100644 --- a/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp +++ b/cpp/src/tests/storePerftools/asyncPerf/MockTransactionContext.cpp @@ -23,24 +23,41 @@ #include "MockTransactionContext.h" -#include "QueuedMessage.h" #include "TransactionAsyncContext.h" #include "qpid/asyncStore/AsyncStoreImpl.h" -#include "qpid/broker/BrokerAsyncContext.h" + +#include <uuid/uuid.h> namespace tests { namespace storePerftools { namespace asyncPerf { +MockTransactionContext::MockTransactionContext(const std::string& xid) : + qpid::broker::TransactionContext(), + m_xid(xid), + m_tpcFlag(!xid.empty()), + m_store(0), + m_txnHandle(0), + m_prepared(false), + m_enqueuedMsgs() +{ + if (!m_tpcFlag) { + setLocalXid(); + } +//std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; +} + MockTransactionContext::MockTransactionContext(qpid::asyncStore::AsyncStoreImpl* store, const std::string& xid) : m_store(store), - m_txnHandle(store->createTxnHandle(xid)), m_prepared(false), m_enqueuedMsgs() { //std::cout << "*TXN* begin: xid=" << getXid() << "; 2PC=" << (is2pc()?"T":"F") << std::endl; + if (m_store != 0) { + m_txnHandle = store->createTxnHandle(xid); + } } MockTransactionContext::~MockTransactionContext() @@ -80,6 +97,12 @@ MockTransactionContext::handleAsyncResult(const qpid::broker::AsyncResult* res, if (res) delete res; } +const qpid::broker::TxnHandle& +MockTransactionContext::getHandle() const +{ + return m_txnHandle; +} + qpid::broker::TxnHandle& MockTransactionContext::getHandle() { @@ -89,13 +112,13 @@ MockTransactionContext::getHandle() bool MockTransactionContext::is2pc() const { - return m_txnHandle.is2pc(); + return m_tpcFlag; } const std::string& MockTransactionContext::getXid() const { - return m_txnHandle.getXid(); + return m_xid; } void @@ -108,7 +131,7 @@ MockTransactionContext::addEnqueuedMsg(QueuedMessage* qm) void MockTransactionContext::prepare() { - if (m_txnHandle.is2pc()) { + if (m_tpcFlag) { localPrepare(); m_prepared = true; } @@ -126,9 +149,11 @@ MockTransactionContext::abort() if (!m_prepared) { localPrepare(); } - m_store->submitAbort(m_txnHandle, - &handleAsyncResult, - dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT))); + if (m_store != 0) { +// m_store->submitAbort(m_txnHandle, +// &handleAsyncResult, +// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_ABORT))); + } //std::cout << "*TXN* abort: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; } @@ -145,9 +170,11 @@ MockTransactionContext::commit() } else { localPrepare(); } - m_store->submitCommit(m_txnHandle, - &handleAsyncResult, - dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT))); + if (m_store != 0) { +// m_store->submitCommit(m_txnHandle, +// &handleAsyncResult, +// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_COMMIT))); + } //std::cout << "*TXN* commit: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; } @@ -156,23 +183,38 @@ MockTransactionContext::commit() void MockTransactionContext::localPrepare() { - m_store->submitPrepare(m_txnHandle, - &handleAsyncResult, - dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE))); + if (m_store != 0) { +// m_store->submitPrepare(m_txnHandle, +// &handleAsyncResult, +// dynamic_cast<qpid::broker::BrokerAsyncContext*>(new TransactionAsyncContext(this, qpid::asyncStore::AsyncOperation::TXN_PREPARE))); + } //std::cout << "*TXN* localPrepare: xid=" << m_txnHandle.getXid() << "; 2PC=" << (m_txnHandle.is2pc()?"T":"F") << std::endl; } // protected void -MockTransactionContext::prepareComplete(const TransactionAsyncContext* tc) +MockTransactionContext::setLocalXid() +{ + uuid_t uuid; + // TODO: Valgrind warning: Possible race condition in uuid_generate_random() - is it thread-safe, and if not, does it matter? + // If this race condition affects the randomness of the UUID, then there could be a problem here. + ::uuid_generate_random(uuid); + char uuidStr[37]; // 36-char uuid + trailing '\0' + ::uuid_unparse(uuid, uuidStr); + m_xid.assign(uuidStr); +} + +// protected +void +MockTransactionContext::prepareComplete(const TransactionAsyncContext* /*tc*/) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_enqueuedMsgsMutex); - while (!m_enqueuedMsgs.empty()) { - m_enqueuedMsgs.front()->clearTransaction(); - m_enqueuedMsgs.pop_front(); - } +// while (!m_enqueuedMsgs.empty()) { +// m_enqueuedMsgs.front()->clearTransaction(); +// m_enqueuedMsgs.pop_front(); +// } //std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": prepareComplete()" << std::endl << std::flush; - assert(tc->getTransactionContext() == this); +// assert(tc->getTransactionContext().get() == this); } @@ -181,7 +223,7 @@ void MockTransactionContext::abortComplete(const TransactionAsyncContext* tc) { //std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": abortComplete()" << std::endl << std::flush; - assert(tc->getTransactionContext() == this); + assert(tc->getTransactionContext().get() == this); } @@ -190,7 +232,7 @@ void MockTransactionContext::commitComplete(const TransactionAsyncContext* tc) { //std::cout << "~~~~~ Transaction xid=\"" << tc->m_tc->getXid() << "\": commitComplete()" << std::endl << std::flush; - assert(tc->getTransactionContext() == this); + assert(tc->getTransactionContext().get() == this); } }}} // namespace tests::storePerftools::asyncPerf |