summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java90
1 files changed, 75 insertions, 15 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index f94edcc655..3dca4fc44e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -53,13 +53,15 @@ public class Session extends SessionInvoker
private static final Logger log = Logger.get(Session.class);
- enum State { NEW, DETACHED, OPEN, CLOSING, CLOSED }
+ enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
class DefaultSessionListener implements SessionListener
{
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
log.info("message: %s", xfr);
@@ -107,6 +109,8 @@ public class Session extends SessionInvoker
private volatile boolean flowControl = false;
private Semaphore credit = new Semaphore(0);
+ private Thread resumer = null;
+
Session(Connection connection, Binary name, long expiry)
{
this.connection = connection;
@@ -234,15 +238,21 @@ public class Session extends SessionInvoker
for (int i = maxComplete + 1; lt(i, commandsOut); i++)
{
Method m = commands[mod(i, commands.length)];
- if (m != null)
+ if (m == null)
{
- sessionCommandPoint(m.getId(), 0);
- send(m);
+ m = new ExecutionSync();
+ m.setId(i);
}
+ sessionCommandPoint(m.getId(), 0);
+ send(m);
}
sessionCommandPoint(commandsOut, 0);
sessionFlush(COMPLETED);
+ resumer = Thread.currentThread();
+ state = RESUMING;
+ listener.resumed(this);
+ resumer = null;
}
}
@@ -384,7 +394,15 @@ public class Session extends SessionInvoker
{
copy = processed.copy();
}
- sessionCompleted(copy, options);
+
+ synchronized (commands)
+ {
+ if (state == DETACHED || state == CLOSING)
+ {
+ return;
+ }
+ sessionCompleted(copy, options);
+ }
}
void knownComplete(RangeSet kc)
@@ -484,12 +502,25 @@ public class Session extends SessionInvoker
synchronized (commands)
{
+ if (state == DETACHED && m.isUnreliable())
+ {
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
+ {
+ return;
+ }
+ }
+
if (state != OPEN && state != CLOSED)
{
- Waiter w = new Waiter(commands, timeout);
- while (w.hasTime() && (state != OPEN && state != CLOSED))
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
{
- w.await();
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && (state != OPEN && state != CLOSED))
+ {
+ w.await();
+ }
}
}
@@ -497,8 +528,24 @@ public class Session extends SessionInvoker
{
case OPEN:
break;
+ case RESUMING:
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
+ {
+ throw new SessionException
+ ("timed out waiting for resume to finish");
+ }
+ break;
case CLOSED:
- throw new SessionClosedException();
+ ExecutionException exc = getException();
+ if (exc != null)
+ {
+ throw new SessionException(exc);
+ }
+ else
+ {
+ throw new SessionClosedException();
+ }
default:
throw new SessionException
(String.format
@@ -512,9 +559,9 @@ public class Session extends SessionInvoker
if (isFull(next))
{
Waiter w = new Waiter(commands, timeout);
- while (w.hasTime() && isFull(next))
+ while (w.hasTime() && isFull(next) && state != CLOSED)
{
- if (state == OPEN)
+ if (state == OPEN || state == RESUMING)
{
try
{
@@ -538,6 +585,19 @@ public class Session extends SessionInvoker
}
}
+ if (state == CLOSED)
+ {
+ ExecutionException exc = getException();
+ if (exc != null)
+ {
+ throw new SessionException(exc);
+ }
+ else
+ {
+ throw new SessionClosedException();
+ }
+ }
+
if (isFull(next))
{
throw new SessionException("timed out waiting for completion");
@@ -547,7 +607,7 @@ public class Session extends SessionInvoker
{
sessionCommandPoint(0, 0);
}
- if (expiry > 0)
+ if (expiry > 0 && !m.isUnreliable())
{
commands[mod(next, commands.length)] = m;
commandBytes += m.getBodySize();
@@ -815,9 +875,9 @@ public class Session extends SessionInvoker
{
throw new SessionException("close() timed out");
}
-
- connection.removeSession(this);
}
+
+ connection.removeSession(this);
}
public void exception(Throwable t)
@@ -829,7 +889,7 @@ public class Session extends SessionInvoker
{
synchronized (commands)
{
- if (expiry == 0)
+ if (expiry == 0 || getException() != null)
{
state = CLOSED;
}