summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-11-20 15:23:08 +0000
committerGordon Sim <gsim@apache.org>2014-11-20 15:23:08 +0000
commit5d4c5f8b7a42ff9ab117be2c56a1e33b7165e8f4 (patch)
tree547d75951481a385e639c93886831cd21cc51f64
parent5be63610c4e7d07ea64b29965a97562dc2d6f4a7 (diff)
downloadqpid-python-5d4c5f8b7a42ff9ab117be2c56a1e33b7165e8f4.tar.gz
QPID-4710: release messages on rollback
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1640755 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/amqp/descriptors.h2
-rw-r--r--qpid/cpp/src/qpid/broker/TxDequeue.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/TxDequeue.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp1
4 files changed, 9 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/amqp/descriptors.h b/qpid/cpp/src/qpid/amqp/descriptors.h
index 2cdaaf9202..a9ee12644a 100644
--- a/qpid/cpp/src/qpid/amqp/descriptors.h
+++ b/qpid/cpp/src/qpid/amqp/descriptors.h
@@ -127,7 +127,7 @@ const std::string RESOURCE_DELETED("amqp:resource-deleted");
const std::string PRECONDITION_FAILED("amqp:precondition-failed");
namespace transaction {
const std::string UNKNOWN_ID("amqp:transaction:unknown-id");
-const std::string ROLLBACK("amqp:transaction:rolback");
+const std::string ROLLBACK("amqp:transaction:rollback");
}
}
}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/broker/TxDequeue.cpp b/qpid/cpp/src/qpid/broker/TxDequeue.cpp
index bd83b434a9..e9a2e0ca98 100644
--- a/qpid/cpp/src/qpid/broker/TxDequeue.cpp
+++ b/qpid/cpp/src/qpid/broker/TxDequeue.cpp
@@ -28,7 +28,7 @@ namespace broker {
TxDequeue::TxDequeue(QueueCursor m, boost::shared_ptr<Queue> q,
qpid::framing::SequenceNumber mId, qpid::framing::SequenceNumber rId)
- : message(m), queue(q), messageId(mId), replicationId(rId) {}
+ : message(m), queue(q), messageId(mId), replicationId(rId), releaseOnAbort(true), redeliveredOnAbort(true) {}
bool TxDequeue::prepare(TransactionContext* ctxt) throw()
{
@@ -55,7 +55,10 @@ void TxDequeue::commit() throw()
}
}
-void TxDequeue::rollback() throw() {}
+void TxDequeue::rollback() throw()
+{
+ if (releaseOnAbort) queue->release(message, redeliveredOnAbort);
+}
void TxDequeue::callObserver(const boost::shared_ptr<TransactionObserver>& observer)
{
diff --git a/qpid/cpp/src/qpid/broker/TxDequeue.h b/qpid/cpp/src/qpid/broker/TxDequeue.h
index e312df9e31..e861bcaa77 100644
--- a/qpid/cpp/src/qpid/broker/TxDequeue.h
+++ b/qpid/cpp/src/qpid/broker/TxDequeue.h
@@ -46,6 +46,8 @@ class TxDequeue: public TxOp
boost::shared_ptr<Queue> queue;
qpid::framing::SequenceNumber messageId;
qpid::framing::SequenceNumber replicationId;
+ bool releaseOnAbort;
+ bool redeliveredOnAbort;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 68a735709b..7f4790fa96 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -812,6 +812,7 @@ void Session::abort()
if (txn) {
txn->rollback();
txAborted();
+ txn = boost::intrusive_ptr<TxBuffer>();
}
}