summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-07-17 13:19:51 +0000
committerGordon Sim <gsim@apache.org>2014-07-17 13:19:51 +0000
commita9de64013865b51e66ac58b465a5eb22d369c560 (patch)
tree2ad57d37b2ee8d90a901ba3bb2d5519ba3a4de11
parent247af45299ffe6621600d9ec9e72e5090907d2fc (diff)
downloadqpid-python-a9de64013865b51e66ac58b465a5eb22d369c560.tar.gz
QPID-5887: revised approach to implict abort
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1611349 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp4
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp58
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h6
-rw-r--r--cpp/src/tests/qpid-txtest2.cpp29
4 files changed, 67 insertions, 30 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index ef8a82d2ea..3600e4d945 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -317,7 +317,9 @@ bool ConnectionImpl::resetSessions(const sys::Mutex::ScopedLock& )
try {
qpid::sys::Mutex::ScopedLock l(lock);
for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
- getImplPtr(i->second)->setSession(connection.newSession(i->first));
+ if (!getImplPtr(i->second)->isTransactional()) {
+ getImplPtr(i->second)->setSession(connection.newSession(i->first));
+ }
}
return true;
} catch (const qpid::TransportFailure& e) {
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index b6ae9514b3..e5e696439b 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -56,11 +56,36 @@ namespace amqp0_10 {
typedef qpid::sys::Mutex::ScopedLock ScopedLock;
typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock;
-SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {}
+SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t), aborted(false) {}
+
+bool SessionImpl::isTransactional() const
+{
+ return transactional;
+}
+
+void SessionImpl::abortTransaction()
+{
+ ScopedLock l(lock);
+ aborted = true;
+}
+
+void SessionImpl::checkAborted()
+{
+ ScopedLock l(lock);
+ checkAbortedLH(l);
+}
+
+void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&)
+{
+ if (aborted) {
+ throw TransactionAborted("Transaction implicitly aborted");
+ }
+}
void SessionImpl::checkError()
{
ScopedLock l(lock);
+ checkAbortedLH(l);
qpid::client::SessionBase_0_10Access s(session);
try {
s.get()->assertOpen();
@@ -185,27 +210,20 @@ template <class T> void getFreeKey(std::string& key, T& map)
key = name;
}
-
void SessionImpl::setSession(qpid::client::Session s)
{
- ScopedLock l(lock);
- if (session.isValid() && transactional) {
- qpid::client::SessionBase_0_10Access ssn_ptr(session);
- ssn_ptr.get()->setException(new TransactionAborted("Transaction aborted due to transport failure"));
- } else {
- session = s;
- incoming.setSession(session);
- if (transactional) {
- session.txSelect();
- }
- for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
- getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver);
- }
- for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
- getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
- }
- session.sync();
+ session = s;
+ incoming.setSession(session);
+ if (transactional) {
+ session.txSelect();
+ }
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver);
+ }
+ for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
+ getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
}
+ session.sync();
}
struct SessionImpl::CreateReceiver : Command
@@ -366,6 +384,7 @@ bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message,
bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout)
{
while (true) {
+ checkAborted();
try {
std::string destination;
if (incoming.getNextDestination(destination, adjust(timeout))) {
@@ -548,6 +567,7 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
+ if (transactional) abortTransaction();
connection->reopen();
}
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index c7dea77d18..b2e4cf3f78 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -78,6 +78,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
qpid::messaging::Connection getConnection() const;
void checkError();
bool hasError();
+ bool isTransactional() const;
bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
@@ -96,6 +97,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
template <class T> bool execute(T& f)
{
try {
+ checkAborted();
f();
return true;
} catch (const qpid::TransportFailure&) {
@@ -129,12 +131,16 @@ class SessionImpl : public qpid::messaging::SessionImpl
Receivers receivers;
Senders senders;
const bool transactional;
+ bool aborted;
bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&);
bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout);
bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer);
void reconnect();
bool backoff();
+ void abortTransaction();
+ void checkAborted();
+ void checkAbortedLH(const qpid::sys::Mutex::ScopedLock&);
void commitImpl();
void rollbackImpl();
diff --git a/cpp/src/tests/qpid-txtest2.cpp b/cpp/src/tests/qpid-txtest2.cpp
index e9fa4282d4..cdd263a081 100644
--- a/cpp/src/tests/qpid-txtest2.cpp
+++ b/cpp/src/tests/qpid-txtest2.cpp
@@ -186,18 +186,27 @@ struct Transfer : public TransactionalClient, public Runnable
Sender sender(session.createSender(target));
Receiver receiver(session.createReceiver(source));
receiver.setCapacity(opts.capacity);
- for (uint t = 0; t < opts.txCount; t++) {
- for (uint m = 0; m < opts.msgsPerTx; m++) {
- Message msg = receiver.fetch(Duration::SECOND*30);
- if (msg.getContentSize() != opts.size) {
- std::ostringstream oss;
- oss << "Message size incorrect: size=" << msg.getContentSize() << "; expected " << opts.size;
- throw std::runtime_error(oss.str());
+ for (uint t = 0; t < opts.txCount;) {
+ try {
+ for (uint m = 0; m < opts.msgsPerTx; m++) {
+ Message msg = receiver.fetch(Duration::SECOND*30);
+ if (msg.getContentSize() != opts.size) {
+ std::ostringstream oss;
+ oss << "Message size incorrect: size=" << msg.getContentSize() << "; expected " << opts.size;
+ throw std::runtime_error(oss.str());
+ }
+ sender.send(msg);
}
- sender.send(msg);
+ session.commit();
+ t++;
+ if (!opts.quiet && t % 10 == 0) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl;
+ } catch (const TransactionAborted&) {
+ std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl;
+ session = connection.createTransactionalSession();
+ sender = session.createSender(target);
+ receiver = session.createReceiver(source);
+ receiver.setCapacity(opts.capacity);
}
- QPID_LOG(info, "Moved " << opts.msgsPerTx << " from " << source << " to " << target);
- session.commit();
}
sender.close();
receiver.close();