summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/TxnBuffer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/TxnBuffer.cpp')
-rw-r--r--cpp/src/qpid/broker/TxnBuffer.cpp57
1 files changed, 31 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp
index 425d725e9e..ba00293452 100644
--- a/cpp/src/qpid/broker/TxnBuffer.cpp
+++ b/cpp/src/qpid/broker/TxnBuffer.cpp
@@ -96,26 +96,6 @@ TxnBuffer::commitLocal(AsyncTransaction* const store)
return false;
}
-// static
-void
-TxnBuffer::handleAsyncResult(const AsyncResultHandle* const arh)
-{
- if (arh) {
- boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
- if (arh->getErrNo()) {
- QPID_LOG(error, "TxnBuffer::handleAsyncResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo()
- << " (" << arh->getErrMsg() << ")");
- tac->getTxnBuffer()->asyncLocalAbort();
- } else {
- if (tac->getOpCode() == qpid::asyncStore::AsyncOperation::TXN_ABORT) {
- tac->getTxnBuffer()->asyncLocalAbort();
- } else {
- tac->getTxnBuffer()->asyncLocalCommit();
- }
- }
- }
-}
-
void
TxnBuffer::asyncLocalCommit()
{
@@ -130,10 +110,9 @@ TxnBuffer::asyncLocalCommit()
m_state = COMMIT;
{
boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
- m_txnHandle,
- qpid::asyncStore::AsyncOperation::TXN_COMMIT,
- &handleAsyncResult,
+ &handleAsyncCommitResult,
&m_resultQueue));
+ m_store->testOp();
m_store->submitCommit(m_txnHandle, tac);
}
break;
@@ -147,6 +126,21 @@ TxnBuffer::asyncLocalCommit()
}
}
+//static
+void
+TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ QPID_LOG(error, "TxnBuffer::handleAsyncCommitResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo()
+ << " (" << arh->getErrMsg() << ")");
+ tac->getTxnBuffer()->asyncLocalAbort();
+ } else {
+ tac->getTxnBuffer()->asyncLocalCommit();
+ }
+ }
+}
+
void
TxnBuffer::asyncLocalAbort()
{
@@ -158,9 +152,7 @@ TxnBuffer::asyncLocalAbort()
m_state = ROLLBACK;
{
boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
- m_txnHandle,
- qpid::asyncStore::AsyncOperation::TXN_ABORT,
- &handleAsyncResult,
+ &handleAsyncAbortResult,
&m_resultQueue));
m_store->submitCommit(m_txnHandle, tac);
}
@@ -173,4 +165,17 @@ TxnBuffer::asyncLocalAbort()
}
}
+//static
+void
+TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) {
+ if (arh) {
+ boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
+ if (arh->getErrNo()) {
+ QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo()
+ << " (" << arh->getErrMsg() << ")");
+ }
+ tac->getTxnBuffer()->asyncLocalAbort();
+ }
+}
+
}} // namespace qpid::broker