diff options
Diffstat (limited to 'cpp/src/qpid/broker/TxPublish.cpp')
-rw-r--r-- | cpp/src/qpid/broker/TxPublish.cpp | 54 |
1 files changed, 41 insertions, 13 deletions
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp index 17b99fd883..4b083033ea 100644 --- a/cpp/src/qpid/broker/TxPublish.cpp +++ b/cpp/src/qpid/broker/TxPublish.cpp @@ -26,9 +26,14 @@ using namespace qpid::broker; TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {} -bool TxPublish::prepare(TransactionContext* ctxt) throw(){ +bool TxPublish::prepare(TransactionContext* ctxt) throw() +{ try{ - for_each(queues.begin(), queues.end(), Prepare(ctxt, msg)); + while (!queues.empty()) { + prepare(ctxt, queues.front()); + prepared.push_back(queues.front()); + queues.pop_front(); + } return true; }catch(const std::exception& e){ QPID_LOG(error, "Failed to prepare: " << e.what()); @@ -38,11 +43,30 @@ bool TxPublish::prepare(TransactionContext* ctxt) throw(){ return false; } -void TxPublish::commit() throw(){ - for_each(queues.begin(), queues.end(), Commit(msg)); +void TxPublish::commit() throw() +{ + try { + for_each(prepared.begin(), prepared.end(), Commit(msg)); + if (msg->checkContentReleasable()) { + msg->releaseContent(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to commit: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to commit (unknown error)"); + } } -void TxPublish::rollback() throw(){ +void TxPublish::rollback() throw() +{ + try { + for_each(prepared.begin(), prepared.end(), Rollback(msg)); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to complete rollback: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to complete rollback (unknown error)"); + } + } void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){ @@ -54,16 +78,14 @@ void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){ } } -TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg) - : ctxt(_ctxt), msg(_msg){} - -void TxPublish::Prepare::operator()(const boost::shared_ptr<Queue>& queue){ +void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue) +{ if (!queue->enqueue(ctxt, msg)){ /** - * if not store then mark message for ack and deleivery once - * commit happens, as async IO will never set it when no store - * exists - */ + * if not store then mark message for ack and deleivery once + * commit happens, as async IO will never set it when no store + * exists + */ msg->enqueueComplete(); } } @@ -74,6 +96,12 @@ void TxPublish::Commit::operator()(const boost::shared_ptr<Queue>& queue){ queue->process(msg); } +TxPublish::Rollback::Rollback(intrusive_ptr<Message>& _msg) : msg(_msg){} + +void TxPublish::Rollback::operator()(const boost::shared_ptr<Queue>& queue){ + queue->enqueueAborted(msg); +} + uint64_t TxPublish::contentSize () { return msg->contentSize (); |