diff options
Diffstat (limited to 'cpp/src/qpid/broker/TxnBuffer.cpp')
-rw-r--r-- | cpp/src/qpid/broker/TxnBuffer.cpp | 57 |
1 files changed, 31 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp index 425d725e9e..ba00293452 100644 --- a/cpp/src/qpid/broker/TxnBuffer.cpp +++ b/cpp/src/qpid/broker/TxnBuffer.cpp @@ -96,26 +96,6 @@ TxnBuffer::commitLocal(AsyncTransaction* const store) return false; } -// static -void -TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh) -{ - if (arh) { - boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); - if (arh->getErrNo()) { - QPID_LOG(error, "TxnBuffer::handleAsyncResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() - << " (" << arh->getErrMsg() << ")"); - tac->getTxnBuffer()->asyncLocalAbort(); - } else { - if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) { - tac->getTxnBuffer()->asyncLocalAbort(); - } else { - tac->getTxnBuffer()->asyncLocalCommit(); - } - } - } -} - void TxnBuffer::asyncLocalCommit() { @@ -130,10 +110,9 @@ TxnBuffer::asyncLocalCommit() m_state = COMMIT; { boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, - m_txnHandle, - qpid::asyncStore::AsyncOperation::TXN_COMMIT, - &handleAsyncResult, + &handleAsyncCommitResult, &m_resultQueue)); + m_store->testOp(); m_store->submitCommit(m_txnHandle, tac); } break; @@ -147,6 +126,21 @@ TxnBuffer::asyncLocalCommit() } } +//static +void +TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + QPID_LOG(error, "TxnBuffer::handleAsyncCommitResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() + << " (" << arh->getErrMsg() << ")"); + tac->getTxnBuffer()->asyncLocalAbort(); + } else { + tac->getTxnBuffer()->asyncLocalCommit(); + } + } +} + void TxnBuffer::asyncLocalAbort() { @@ -158,9 +152,7 @@ TxnBuffer::asyncLocalAbort() m_state = ROLLBACK; { boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this, - m_txnHandle, - qpid::asyncStore::AsyncOperation::TXN_ABORT, - &handleAsyncResult, + &handleAsyncAbortResult, &m_resultQueue)); m_store->submitCommit(m_txnHandle, tac); } @@ -173,4 +165,17 @@ TxnBuffer::asyncLocalAbort() } } +//static +void +TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() + << " (" << arh->getErrMsg() << ")"); + } + tac->getTxnBuffer()->asyncLocalAbort(); + } +} + }} // namespace qpid::broker |