diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java | 52 |
1 files changed, 28 insertions, 24 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 556134f984..0de558d152 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -259,6 +259,8 @@ public class Session extends SessionInvoker { synchronized (commands) { + attach(); + for (int i = maxComplete + 1; lt(i, commandsOut); i++) { Method m = commands[mod(i, commands.length)]; @@ -299,11 +301,18 @@ public class Session extends SessionInvoker sessionCommandPoint(m.getId(), 0); send(m); } - + sessionCommandPoint(commandsOut, 0); + sessionFlush(COMPLETED); resumer = Thread.currentThread(); state = RESUMING; + + if(isTransacted()) + { + txSelect(); + } + listener.resumed(this); resumer = null; } @@ -567,17 +576,6 @@ public class Session extends SessionInvoker { if (m.getEncodedTrack() == Frame.L4) { - - if (state == DETACHED && transacted) - { - state = CLOSED; - delegate.closed(this); - connection.removeSession(this); - throw new SessionException( - "Session failed over, possibly in the middle of a transaction. " + - "Closing the session. Any Transaction in progress will be rolledback."); - } - if (m.hasPayload()) { acquireCredit(); @@ -585,24 +583,30 @@ public class Session extends SessionInvoker synchronized (commands) { - if (state == DETACHED && m.isUnreliable()) + //allow the txSelect operation to be invoked during resume + boolean skipWait = m instanceof TxSelect && state == RESUMING; + + if(!skipWait) { - Thread current = Thread.currentThread(); - if (!current.equals(resumer)) + if (state == DETACHED && m.isUnreliable()) { - return; + Thread current = Thread.currentThread(); + if (!current.equals(resumer)) + { + return; + } } - } - if (state != OPEN && state != CLOSED && state != CLOSING) - { - Thread current = Thread.currentThread(); - if (!current.equals(resumer)) + if (state != OPEN && state != CLOSED && state != CLOSING) { - Waiter w = new Waiter(commands, timeout); - while (w.hasTime() && (state != OPEN && state != CLOSED)) + Thread current = Thread.currentThread(); + if (!current.equals(resumer)) { - w.await(); + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && (state != OPEN && state != CLOSED)) + { + w.await(); + } } } } |