diff options
author | Robert Gemmell <robbie@apache.org> | 2011-10-06 16:46:58 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-10-06 16:46:58 +0000 |
commit | e761947acc3a6c64c2e339c5c01eaa7e79ccffda (patch) | |
tree | 27078edc2e5f46c1a44edb25c146461c4ee7db84 | |
parent | d81c553c1f653ae9c536d8f926c231038b4b3532 (diff) | |
download | qpid-python-e761947acc3a6c64c2e339c5c01eaa7e79ccffda.tar.gz |
QPID-3524: enable support for failing over transacted sessions, ensuring txSelect is sent after the command-point process is complete, but before the session is marked in the open state.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1179702 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/Connection.java | 18 | ||||
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/Session.java | 52 |
2 files changed, 30 insertions, 40 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 347bf8e649..1c521244d0 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -485,26 +485,12 @@ public class Connection extends ConnectionInvoker { synchronized (lock) { - List <Binary> transactedSessions = new ArrayList(); for (Session ssn : sessions.values()) { - if (ssn.isTransacted()) - { - transactedSessions.add(ssn.getName()); - ssn.setState(Session.State.CLOSED); - } - else - { - map(ssn); - ssn.attach(); - ssn.resume(); - } + map(ssn); + ssn.resume(); } - for (Binary ssn_name : transactedSessions) - { - sessions.remove(ssn_name); - } setState(OPEN); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 556134f984..0de558d152 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -259,6 +259,8 @@ public class Session extends SessionInvoker { synchronized (commands) { + attach(); + for (int i = maxComplete + 1; lt(i, commandsOut); i++) { Method m = commands[mod(i, commands.length)]; @@ -299,11 +301,18 @@ public class Session extends SessionInvoker sessionCommandPoint(m.getId(), 0); send(m); } - + sessionCommandPoint(commandsOut, 0); + sessionFlush(COMPLETED); resumer = Thread.currentThread(); state = RESUMING; + + if(isTransacted()) + { + txSelect(); + } + listener.resumed(this); resumer = null; } @@ -567,17 +576,6 @@ public class Session extends SessionInvoker { if (m.getEncodedTrack() == Frame.L4) { - - if (state == DETACHED && transacted) - { - state = CLOSED; - delegate.closed(this); - connection.removeSession(this); - throw new SessionException( - "Session failed over, possibly in the middle of a transaction. " + - "Closing the session. Any Transaction in progress will be rolledback."); - } - if (m.hasPayload()) { acquireCredit(); @@ -585,24 +583,30 @@ public class Session extends SessionInvoker synchronized (commands) { - if (state == DETACHED && m.isUnreliable()) + //allow the txSelect operation to be invoked during resume + boolean skipWait = m instanceof TxSelect && state == RESUMING; + + if(!skipWait) { - Thread current = Thread.currentThread(); - if (!current.equals(resumer)) + if (state == DETACHED && m.isUnreliable()) { - return; + Thread current = Thread.currentThread(); + if (!current.equals(resumer)) + { + return; + } } - } - if (state != OPEN && state != CLOSED && state != CLOSING) - { - Thread current = Thread.currentThread(); - if (!current.equals(resumer)) + if (state != OPEN && state != CLOSED && state != CLOSING) { - Waiter w = new Waiter(commands, timeout); - while (w.hasTime() && (state != OPEN && state != CLOSED)) + Thread current = Thread.currentThread(); + if (!current.equals(resumer)) { - w.await(); + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && (state != OPEN && state != CLOSED)) + { + w.await(); + } } } } |