summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-10-06 16:46:58 +0000
committerRobert Gemmell <robbie@apache.org>2011-10-06 16:46:58 +0000
commite761947acc3a6c64c2e339c5c01eaa7e79ccffda (patch)
tree27078edc2e5f46c1a44edb25c146461c4ee7db84
parentd81c553c1f653ae9c536d8f926c231038b4b3532 (diff)
downloadqpid-python-e761947acc3a6c64c2e339c5c01eaa7e79ccffda.tar.gz
QPID-3524: enable support for failing over transacted sessions, ensuring txSelect is sent after the command-point process is complete, but before the session is marked in the open state.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1179702 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java18
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java52
2 files changed, 30 insertions, 40 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 347bf8e649..1c521244d0 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -485,26 +485,12 @@ public class Connection extends ConnectionInvoker
{
synchronized (lock)
{
- List <Binary> transactedSessions = new ArrayList();
for (Session ssn : sessions.values())
{
- if (ssn.isTransacted())
- {
- transactedSessions.add(ssn.getName());
- ssn.setState(Session.State.CLOSED);
- }
- else
- {
- map(ssn);
- ssn.attach();
- ssn.resume();
- }
+ map(ssn);
+ ssn.resume();
}
- for (Binary ssn_name : transactedSessions)
- {
- sessions.remove(ssn_name);
- }
setState(OPEN);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 556134f984..0de558d152 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -259,6 +259,8 @@ public class Session extends SessionInvoker
{
synchronized (commands)
{
+ attach();
+
for (int i = maxComplete + 1; lt(i, commandsOut); i++)
{
Method m = commands[mod(i, commands.length)];
@@ -299,11 +301,18 @@ public class Session extends SessionInvoker
sessionCommandPoint(m.getId(), 0);
send(m);
}
-
+
sessionCommandPoint(commandsOut, 0);
+
sessionFlush(COMPLETED);
resumer = Thread.currentThread();
state = RESUMING;
+
+ if(isTransacted())
+ {
+ txSelect();
+ }
+
listener.resumed(this);
resumer = null;
}
@@ -567,17 +576,6 @@ public class Session extends SessionInvoker
{
if (m.getEncodedTrack() == Frame.L4)
{
-
- if (state == DETACHED && transacted)
- {
- state = CLOSED;
- delegate.closed(this);
- connection.removeSession(this);
- throw new SessionException(
- "Session failed over, possibly in the middle of a transaction. " +
- "Closing the session. Any Transaction in progress will be rolledback.");
- }
-
if (m.hasPayload())
{
acquireCredit();
@@ -585,24 +583,30 @@ public class Session extends SessionInvoker
synchronized (commands)
{
- if (state == DETACHED && m.isUnreliable())
+ //allow the txSelect operation to be invoked during resume
+ boolean skipWait = m instanceof TxSelect && state == RESUMING;
+
+ if(!skipWait)
{
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
+ if (state == DETACHED && m.isUnreliable())
{
- return;
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
+ {
+ return;
+ }
}
- }
- if (state != OPEN && state != CLOSED && state != CLOSING)
- {
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
+ if (state != OPEN && state != CLOSED && state != CLOSING)
{
- Waiter w = new Waiter(commands, timeout);
- while (w.hasTime() && (state != OPEN && state != CLOSED))
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
{
- w.await();
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && (state != OPEN && state != CLOSED))
+ {
+ w.await();
+ }
}
}
}