diff options
author | Gordon Sim <gsim@apache.org> | 2014-07-17 13:19:51 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2014-07-17 13:19:51 +0000 |
commit | a9de64013865b51e66ac58b465a5eb22d369c560 (patch) | |
tree | 2ad57d37b2ee8d90a901ba3bb2d5519ba3a4de11 | |
parent | 247af45299ffe6621600d9ec9e72e5090907d2fc (diff) | |
download | qpid-python-a9de64013865b51e66ac58b465a5eb22d369c560.tar.gz |
QPID-5887: revised approach to implict abort
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1611349 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 58 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 6 | ||||
-rw-r--r-- | cpp/src/tests/qpid-txtest2.cpp | 29 |
4 files changed, 67 insertions, 30 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index ef8a82d2ea..3600e4d945 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -317,7 +317,9 @@ bool ConnectionImpl::resetSessions(const sys::Mutex::ScopedLock& ) try { qpid::sys::Mutex::ScopedLock l(lock); for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { - getImplPtr(i->second)->setSession(connection.newSession(i->first)); + if (!getImplPtr(i->second)->isTransactional()) { + getImplPtr(i->second)->setSession(connection.newSession(i->first)); + } } return true; } catch (const qpid::TransportFailure& e) { diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index b6ae9514b3..e5e696439b 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -56,11 +56,36 @@ namespace amqp0_10 { typedef qpid::sys::Mutex::ScopedLock ScopedLock; typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock; -SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {} +SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t), aborted(false) {} + +bool SessionImpl::isTransactional() const +{ + return transactional; +} + +void SessionImpl::abortTransaction() +{ + ScopedLock l(lock); + aborted = true; +} + +void SessionImpl::checkAborted() +{ + ScopedLock l(lock); + checkAbortedLH(l); +} + +void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&) +{ + if (aborted) { + throw TransactionAborted("Transaction implicitly aborted"); + } +} void SessionImpl::checkError() { ScopedLock l(lock); + checkAbortedLH(l); qpid::client::SessionBase_0_10Access s(session); try { s.get()->assertOpen(); @@ -185,27 +210,20 @@ template <class T> void getFreeKey(std::string& key, T& map) key = name; } - void SessionImpl::setSession(qpid::client::Session s) { - ScopedLock l(lock); - if (session.isValid() && transactional) { - qpid::client::SessionBase_0_10Access ssn_ptr(session); - ssn_ptr.get()->setException(new TransactionAborted("Transaction aborted due to transport failure")); - } else { - session = s; - incoming.setSession(session); - if (transactional) { - session.txSelect(); - } - for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { - getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver); - } - for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { - getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); - } - session.sync(); + session = s; + incoming.setSession(session); + if (transactional) { + session.txSelect(); + } + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { + getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver); + } + for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { + getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); } + session.sync(); } struct SessionImpl::CreateReceiver : Command @@ -366,6 +384,7 @@ bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout) { while (true) { + checkAborted(); try { std::string destination; if (incoming.getNextDestination(destination, adjust(timeout))) { @@ -548,6 +567,7 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { + if (transactional) abortTransaction(); connection->reopen(); } diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index c7dea77d18..b2e4cf3f78 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -78,6 +78,7 @@ class SessionImpl : public qpid::messaging::SessionImpl qpid::messaging::Connection getConnection() const; void checkError(); bool hasError(); + bool isTransactional() const; bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); @@ -96,6 +97,7 @@ class SessionImpl : public qpid::messaging::SessionImpl template <class T> bool execute(T& f) { try { + checkAborted(); f(); return true; } catch (const qpid::TransportFailure&) { @@ -129,12 +131,16 @@ class SessionImpl : public qpid::messaging::SessionImpl Receivers receivers; Senders senders; const bool transactional; + bool aborted; bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&); bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout); bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); void reconnect(); bool backoff(); + void abortTransaction(); + void checkAborted(); + void checkAbortedLH(const qpid::sys::Mutex::ScopedLock&); void commitImpl(); void rollbackImpl(); diff --git a/cpp/src/tests/qpid-txtest2.cpp b/cpp/src/tests/qpid-txtest2.cpp index e9fa4282d4..cdd263a081 100644 --- a/cpp/src/tests/qpid-txtest2.cpp +++ b/cpp/src/tests/qpid-txtest2.cpp @@ -186,18 +186,27 @@ struct Transfer : public TransactionalClient, public Runnable Sender sender(session.createSender(target)); Receiver receiver(session.createReceiver(source)); receiver.setCapacity(opts.capacity); - for (uint t = 0; t < opts.txCount; t++) { - for (uint m = 0; m < opts.msgsPerTx; m++) { - Message msg = receiver.fetch(Duration::SECOND*30); - if (msg.getContentSize() != opts.size) { - std::ostringstream oss; - oss << "Message size incorrect: size=" << msg.getContentSize() << "; expected " << opts.size; - throw std::runtime_error(oss.str()); + for (uint t = 0; t < opts.txCount;) { + try { + for (uint m = 0; m < opts.msgsPerTx; m++) { + Message msg = receiver.fetch(Duration::SECOND*30); + if (msg.getContentSize() != opts.size) { + std::ostringstream oss; + oss << "Message size incorrect: size=" << msg.getContentSize() << "; expected " << opts.size; + throw std::runtime_error(oss.str()); + } + sender.send(msg); } - sender.send(msg); + session.commit(); + t++; + if (!opts.quiet && t % 10 == 0) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl; + } catch (const TransactionAborted&) { + std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl; + session = connection.createTransactionalSession(); + sender = session.createSender(target); + receiver = session.createReceiver(source); + receiver.setCapacity(opts.capacity); } - QPID_LOG(info, "Moved " << opts.msgsPerTx << " from " << source << " to " << target); - session.commit(); } sender.close(); receiver.close(); |