summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/TxnBuffer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/TxnBuffer.cpp')
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.cpp50
1 files changed, 7 insertions, 43 deletions
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp
index b975f09448..425d725e9e 100644
--- a/cpp/src/qpid/broker/TxnBuffer.cpp
+++ b/cpp/src/qpid/broker/TxnBuffer.cpp
@@ -24,13 +24,10 @@
#include "TxnBuffer.h"
#include "AsyncResultHandle.h"
-#include "AsyncStore.h"
#include "TxnAsyncContext.h"
#include "TxnOp.h"
-#include "qpid/Exception.h"
-
-#include <boost/shared_ptr.hpp>
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
@@ -47,7 +44,6 @@ TxnBuffer::~TxnBuffer()
void
TxnBuffer::enlist(boost::shared_ptr<TxnOp> op)
{
-//std::cout << "TTT TxnBuffer::enlist" << std::endl << std::flush;
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
m_ops.push_back(op);
}
@@ -55,7 +51,6 @@ TxnBuffer::enlist(boost::shared_ptr<TxnOp> op)
bool
TxnBuffer::prepare(TxnHandle& th)
{
-//std::cout << "TTT TxnBuffer::prepare" << std::endl << std::flush;
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
if (!(*i)->prepare(th)) {
@@ -68,7 +63,6 @@ TxnBuffer::prepare(TxnHandle& th)
void
TxnBuffer::commit()
{
-//std::cout << "TTT TxnBuffer::commit" << std::endl << std::flush;
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->commit();
@@ -79,7 +73,6 @@ TxnBuffer::commit()
void
TxnBuffer::rollback()
{
-//std::cout << "TTT TxnBuffer::rollback" << std::endl << std::flush;
qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
(*i)->rollback();
@@ -88,17 +81,16 @@ TxnBuffer::rollback()
}
bool
-TxnBuffer::commitLocal(AsyncTransactionalStore* const store)
+TxnBuffer::commitLocal(AsyncTransaction* const store)
{
-//std::cout << "TTT TxnBuffer::commitLocal" << std::endl << std::flush;
if (store) {
try {
m_store = store;
asyncLocalCommit();
} catch (std::exception& e) {
- std::cerr << "Commit failed: " << e.what() << std::endl;
+ QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what());
} catch (...) {
- std::cerr << "Commit failed (unknown exception)" << std::endl;
+ QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)");
}
}
return false;
@@ -111,11 +103,10 @@ TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh)
if (arh) {
boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
if (arh->getErrNo()) {
- std::cerr << "Transaction xid=\"" << tac->getTransactionContext().getXid() << "\": Operation " << tac->getOpStr() << ": failure "
- << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl;
+ QPID_LOG(error, "TxnBuffer::handleAsyncResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo()
+ << " (" << arh->getErrMsg() << ")");
tac->getTxnBuffer()->asyncLocalAbort();
} else {
-//std::cout << "TTT TxnBuffer::handleAsyncResult() op=" << tac->getOpStr() << std::endl << std::flush;
if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) {
tac->getTxnBuffer()->asyncLocalAbort();
} else {
@@ -131,13 +122,11 @@ TxnBuffer::asyncLocalCommit()
assert(m_store != 0);
switch(m_state) {
case NONE:
-//std::cout << "TTT TxnBuffer::asyncLocalCommit: NONE->PREPARE" << std::endl << std::flush;
m_state = PREPARE;
m_txnHandle = m_store->createTxnHandle(this);
prepare(m_txnHandle);
break;
case PREPARE:
-//std::cout << "TTT TxnBuffer::asyncLocalCommit: PREPARE->COMMIT" << std::endl << std::flush;
m_state = COMMIT;
{
boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
@@ -149,16 +138,12 @@ TxnBuffer::asyncLocalCommit()
}
break;
case COMMIT:
-//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMMIT->COMPLETE" << std::endl << std::flush;
commit();
m_state = COMPLETE;
delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
break;
-// case COMPLETE:
-//std::cout << "TTT TxnBuffer:asyncLocalCommit: COMPLETE" << std::endl << std::flush;
- break;
+ case COMPLETE:
default: ;
-//std::cout << "TTT TxnBuffer:asyncLocalCommit: Unexpected state " << m_state << std::endl << std::flush;
}
}
@@ -170,7 +155,6 @@ TxnBuffer::asyncLocalAbort()
case NONE:
case PREPARE:
case COMMIT:
-//std::cout << "TTT TxnBuffer::asyncRollback: xxx->ROLLBACK" << std::endl << std::flush;
m_state = ROLLBACK;
{
boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
@@ -182,31 +166,11 @@ TxnBuffer::asyncLocalAbort()
}
break;
case ROLLBACK:
-//std::cout << "TTT TxnBuffer:asyncRollback: ROLLBACK->COMPLETE" << std::endl << std::flush;
rollback();
m_state = COMPLETE;
delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
default: ;
-//std::cout << "TTT TxnBuffer:asyncRollback: Unexpected state " << m_state << std::endl << std::flush;
- }
-}
-
-// for debugging
-/*
-void
-TxnBuffer::printState(std::ostream& os)
-{
- os << "state=";
- switch(m_state) {
- case NONE: os << "NONE"; break;
- case PREPARE: os << "PREPARE"; break;
- case COMMIT: os << "COMMIT"; break;
- case ROLLBACK: os << "ROLLBACK"; break;
- case COMPLETE: os << "COMPLETE"; break;
- default: os << m_state << "(unknown)";
}
- os << "; " << m_ops.size() << "; store=" << m_store;
}
-*/
}} // namespace qpid::broker