summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java52
1 files changed, 28 insertions, 24 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 556134f984..0de558d152 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -259,6 +259,8 @@ public class Session extends SessionInvoker
{
synchronized (commands)
{
+ attach();
+
for (int i = maxComplete + 1; lt(i, commandsOut); i++)
{
Method m = commands[mod(i, commands.length)];
@@ -299,11 +301,18 @@ public class Session extends SessionInvoker
sessionCommandPoint(m.getId(), 0);
send(m);
}
-
+
sessionCommandPoint(commandsOut, 0);
+
sessionFlush(COMPLETED);
resumer = Thread.currentThread();
state = RESUMING;
+
+ if(isTransacted())
+ {
+ txSelect();
+ }
+
listener.resumed(this);
resumer = null;
}
@@ -567,17 +576,6 @@ public class Session extends SessionInvoker
{
if (m.getEncodedTrack() == Frame.L4)
{
-
- if (state == DETACHED && transacted)
- {
- state = CLOSED;
- delegate.closed(this);
- connection.removeSession(this);
- throw new SessionException(
- "Session failed over, possibly in the middle of a transaction. " +
- "Closing the session. Any Transaction in progress will be rolledback.");
- }
-
if (m.hasPayload())
{
acquireCredit();
@@ -585,24 +583,30 @@ public class Session extends SessionInvoker
synchronized (commands)
{
- if (state == DETACHED && m.isUnreliable())
+ //allow the txSelect operation to be invoked during resume
+ boolean skipWait = m instanceof TxSelect && state == RESUMING;
+
+ if(!skipWait)
{
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
+ if (state == DETACHED && m.isUnreliable())
{
- return;
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
+ {
+ return;
+ }
}
- }
- if (state != OPEN && state != CLOSED && state != CLOSING)
- {
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
+ if (state != OPEN && state != CLOSED && state != CLOSING)
{
- Waiter w = new Waiter(commands, timeout);
- while (w.hasTime() && (state != OPEN && state != CLOSED))
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
{
- w.await();
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && (state != OPEN && state != CLOSED))
+ {
+ w.await();
+ }
}
}
}