diff options
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/SessionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 58 |
1 files changed, 39 insertions, 19 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index b6ae9514b3..e5e696439b 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -56,11 +56,36 @@ namespace amqp0_10 { typedef qpid::sys::Mutex::ScopedLock ScopedLock; typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock; -SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {} +SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t), aborted(false) {} + +bool SessionImpl::isTransactional() const +{ + return transactional; +} + +void SessionImpl::abortTransaction() +{ + ScopedLock l(lock); + aborted = true; +} + +void SessionImpl::checkAborted() +{ + ScopedLock l(lock); + checkAbortedLH(l); +} + +void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&) +{ + if (aborted) { + throw TransactionAborted("Transaction implicitly aborted"); + } +} void SessionImpl::checkError() { ScopedLock l(lock); + checkAbortedLH(l); qpid::client::SessionBase_0_10Access s(session); try { s.get()->assertOpen(); @@ -185,27 +210,20 @@ template <class T> void getFreeKey(std::string& key, T& map) key = name; } - void SessionImpl::setSession(qpid::client::Session s) { - ScopedLock l(lock); - if (session.isValid() && transactional) { - qpid::client::SessionBase_0_10Access ssn_ptr(session); - ssn_ptr.get()->setException(new TransactionAborted("Transaction aborted due to transport failure")); - } else { - session = s; - incoming.setSession(session); - if (transactional) { - session.txSelect(); - } - for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { - getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver); - } - for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { - getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); - } - session.sync(); + session = s; + incoming.setSession(session); + if (transactional) { + session.txSelect(); + } + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { + getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver); + } + for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { + getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); } + session.sync(); } struct SessionImpl::CreateReceiver : Command @@ -366,6 +384,7 @@ bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout) { while (true) { + checkAborted(); try { std::string destination; if (incoming.getNextDestination(destination, adjust(timeout))) { @@ -548,6 +567,7 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { + if (transactional) abortTransaction(); connection->reopen(); } |