summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java86
1 files changed, 59 insertions, 27 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 0de558d152..321e5256b2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -50,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Session
@@ -125,6 +126,8 @@ public class Session extends SessionInvoker
private SessionDetachCode detachCode;
private final Object stateLock = new Object();
+ private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
+
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
@@ -257,6 +260,7 @@ public class Session extends SessionInvoker
void resume()
{
+ _failoverRequired.set(false);
synchronized (commands)
{
attach();
@@ -459,7 +463,7 @@ public class Session extends SessionInvoker
synchronized (commands)
{
- if (state == DETACHED || state == CLOSING)
+ if (state == DETACHED || state == CLOSING || state == CLOSED)
{
return;
}
@@ -583,30 +587,25 @@ public class Session extends SessionInvoker
synchronized (commands)
{
- //allow the txSelect operation to be invoked during resume
- boolean skipWait = m instanceof TxSelect && state == RESUMING;
-
- if(!skipWait)
+ if (state == DETACHED && m.isUnreliable())
{
- if (state == DETACHED && m.isUnreliable())
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
{
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
- {
- return;
- }
+ return;
}
+ }
- if (state != OPEN && state != CLOSED && state != CLOSING)
+ if (state != OPEN && state != CLOSED && state != CLOSING)
+ {
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer) )
{
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && (state != OPEN && state != CLOSED))
{
- Waiter w = new Waiter(commands, timeout);
- while (w.hasTime() && (state != OPEN && state != CLOSED))
- {
- w.await();
- }
+ checkFailoverRequired("Command was interrupted because of failover, before being sent");
+ w.await();
}
}
}
@@ -674,6 +673,7 @@ public class Session extends SessionInvoker
}
}
}
+ checkFailoverRequired("Command was interrupted because of failover, before being sent");
w.await();
}
}
@@ -768,6 +768,14 @@ public class Session extends SessionInvoker
}
}
+ private void checkFailoverRequired(String message)
+ {
+ if (_failoverRequired.get())
+ {
+ throw new SessionException(message);
+ }
+ }
+
protected boolean shouldIssueFlush(int next)
{
return (next % 65536) == 0;
@@ -793,6 +801,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
+ checkFailoverRequired("Session sync was interrupted by failover.");
log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands);
w.await();
}
@@ -853,13 +862,6 @@ public class Session extends SessionInvoker
}
}
- private ConnectionClose close = null;
-
- void closeCode(ConnectionClose close)
- {
- this.close = close;
- }
-
ExecutionException getException()
{
synchronized (results)
@@ -910,6 +912,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(this, timeout);
while (w.hasTime() && state != CLOSED && !isDone())
{
+ checkFailoverRequired("Operation was interrupted by failover.");
log.debug("%s waiting for result: %s", Session.this, this);
w.await();
}
@@ -921,7 +924,12 @@ public class Session extends SessionInvoker
}
else if (state == CLOSED)
{
- throw new SessionException(getException());
+ ExecutionException ex = getException();
+ if(ex == null)
+ {
+ throw new SessionClosedException();
+ }
+ throw new SessionException(ex);
}
else
{
@@ -1001,6 +1009,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED)
{
+ checkFailoverRequired("close() was interrupted by failover.");
w.await();
}
@@ -1095,6 +1104,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(stateLock, timeout);
while (w.hasTime() && state == NEW)
{
+ checkFailoverRequired("Session opening was interrupted by failover.");
w.await();
}
}
@@ -1117,4 +1127,26 @@ public class Session extends SessionInvoker
{
return stateLock;
}
+
+ protected void notifyFailoverRequired()
+ {
+ //ensure any operations waiting are aborted to
+ //prevent them waiting for timeout for 60 seconds
+ //and possibly preventing failover proceeding
+ _failoverRequired.set(true);
+ synchronized (commands)
+ {
+ commands.notifyAll();
+ }
+ synchronized (results)
+ {
+ for (ResultFuture<?> result : results.values())
+ {
+ synchronized(result)
+ {
+ result.notifyAll();
+ }
+ }
+ }
+ }
}