diff options
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 58 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 6 |
3 files changed, 48 insertions, 20 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index ef8a82d2ea..3600e4d945 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -317,7 +317,9 @@ bool ConnectionImpl::resetSessions(const sys::Mutex::ScopedLock& ) try { qpid::sys::Mutex::ScopedLock l(lock); for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { - getImplPtr(i->second)->setSession(connection.newSession(i->first)); + if (!getImplPtr(i->second)->isTransactional()) { + getImplPtr(i->second)->setSession(connection.newSession(i->first)); + } } return true; } catch (const qpid::TransportFailure& e) { diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index b6ae9514b3..e5e696439b 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -56,11 +56,36 @@ namespace amqp0_10 { typedef qpid::sys::Mutex::ScopedLock ScopedLock; typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock; -SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {} +SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t), aborted(false) {} + +bool SessionImpl::isTransactional() const +{ + return transactional; +} + +void SessionImpl::abortTransaction() +{ + ScopedLock l(lock); + aborted = true; +} + +void SessionImpl::checkAborted() +{ + ScopedLock l(lock); + checkAbortedLH(l); +} + +void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&) +{ + if (aborted) { + throw TransactionAborted("Transaction implicitly aborted"); + } +} void SessionImpl::checkError() { ScopedLock l(lock); + checkAbortedLH(l); qpid::client::SessionBase_0_10Access s(session); try { s.get()->assertOpen(); @@ -185,27 +210,20 @@ template <class T> void getFreeKey(std::string& key, T& map) key = name; } - void SessionImpl::setSession(qpid::client::Session s) { - ScopedLock l(lock); - if (session.isValid() && transactional) { - qpid::client::SessionBase_0_10Access ssn_ptr(session); - ssn_ptr.get()->setException(new TransactionAborted("Transaction aborted due to transport failure")); - } else { - session = s; - incoming.setSession(session); - if (transactional) { - session.txSelect(); - } - for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { - getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver); - } - for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { - getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); - } - session.sync(); + session = s; + incoming.setSession(session); + if (transactional) { + session.txSelect(); + } + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { + getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver); + } + for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { + getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); } + session.sync(); } struct SessionImpl::CreateReceiver : Command @@ -366,6 +384,7 @@ bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout) { while (true) { + checkAborted(); try { std::string destination; if (incoming.getNextDestination(destination, adjust(timeout))) { @@ -548,6 +567,7 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { + if (transactional) abortTransaction(); connection->reopen(); } diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index c7dea77d18..b2e4cf3f78 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -78,6 +78,7 @@ class SessionImpl : public qpid::messaging::SessionImpl qpid::messaging::Connection getConnection() const; void checkError(); bool hasError(); + bool isTransactional() const; bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); @@ -96,6 +97,7 @@ class SessionImpl : public qpid::messaging::SessionImpl template <class T> bool execute(T& f) { try { + checkAborted(); f(); return true; } catch (const qpid::TransportFailure&) { @@ -129,12 +131,16 @@ class SessionImpl : public qpid::messaging::SessionImpl Receivers receivers; Senders senders; const bool transactional; + bool aborted; bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&); bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout); bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); void reconnect(); bool backoff(); + void abortTransaction(); + void checkAborted(); + void checkAbortedLH(const qpid::sys::Mutex::ScopedLock&); void commitImpl(); void rollbackImpl(); |