summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java87
1 files changed, 47 insertions, 40 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 5a9ea73cae..3fc596d0eb 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -33,7 +33,6 @@ import static org.apache.qpid.transport.Session.State.RESUMING;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.transport.network.Frame;
-import static org.apache.qpid.transport.util.Functions.mod;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
import static org.apache.qpid.util.Serial.ge;
@@ -44,11 +43,9 @@ import static org.apache.qpid.util.Serial.max;
import static org.apache.qpid.util.Strings.toUTF8;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,7 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class Session extends SessionInvoker
{
private static final Logger log = Logger.get(Session.class);
-
+
public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
static class DefaultSessionListener implements SessionListener
@@ -113,7 +110,9 @@ public class Session extends SessionInvoker
// outgoing command count
private int commandsOut = 0;
- private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)];
+ private final int commandLimit = Integer.getInteger("qpid.session.command_limit", 64 * 1024);
+ private Map<Integer,Method> commands = new HashMap<Integer, Method>();
+ private final Object commandsLock = new Object();
private int commandBytes = 0;
private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024);
private int maxComplete = commandsOut - 1;
@@ -196,7 +195,7 @@ public class Session extends SessionInvoker
public void setAutoSync(boolean value)
{
- synchronized (commands)
+ synchronized (commandsLock)
{
this.autoSync = value;
}
@@ -204,10 +203,10 @@ public class Session extends SessionInvoker
protected void setState(State state)
{
- synchronized (commands)
+ synchronized (commandsLock)
{
this.state = state;
- commands.notifyAll();
+ commandsLock.notifyAll();
}
}
@@ -276,13 +275,13 @@ public class Session extends SessionInvoker
void resume()
{
_failoverRequired.set(false);
- synchronized (commands)
+ synchronized (commandsLock)
{
attach();
for (int i = maxComplete + 1; lt(i, commandsOut); i++)
{
- Method m = commands[mod(i, commands.length)];
+ Method m = getCommand(i);
if (m == null)
{
m = new ExecutionSync();
@@ -337,11 +336,27 @@ public class Session extends SessionInvoker
}
}
+ private Method getCommand(int i)
+ {
+ return commands.get(i);
+ }
+
+ private void setCommand(int commandId, Method command)
+ {
+ commands.put(commandId, command);
+ }
+
+ private Method removeCommand(int id)
+ {
+ return commands.remove(id);
+ }
+
void dump()
{
- synchronized (commands)
+ synchronized (commandsLock)
{
- for (Method m : commands)
+ TreeMap<Integer, Method> ordered = new TreeMap<Integer, Method>(commands);
+ for (Method m : ordered.values())
{
if (m != null)
{
@@ -484,7 +499,7 @@ public class Session extends SessionInvoker
copy = processed.copy();
}
- synchronized (commands)
+ synchronized (commandsLock)
{
if (state == DETACHED || state == CLOSING || state == CLOSED)
{
@@ -539,18 +554,16 @@ public class Session extends SessionInvoker
{
log.debug("%s complete(%d, %d)", this, lower, upper);
}
- synchronized (commands)
+ synchronized (commandsLock)
{
int old = maxComplete;
for (int id = max(maxComplete, lower); le(id, upper); id++)
{
- int idx = mod(id, commands.length);
- Method m = commands[idx];
+ Method m = removeCommand(id);
if (m != null)
{
commandBytes -= m.getBodySize();
m.complete();
- commands[idx] = null;
}
}
if (le(lower, maxComplete + 1))
@@ -563,7 +576,7 @@ public class Session extends SessionInvoker
log.debug("%s commands remaining: %s", this, commandsOut - maxComplete);
}
- commands.notifyAll();
+ commandsLock.notifyAll();
return gt(maxComplete, old);
}
}
@@ -596,7 +609,7 @@ public class Session extends SessionInvoker
protected boolean isCommandsFull(int id)
{
- return id - maxComplete >= commands.length;
+ return id - maxComplete >= commandLimit;
}
public void invoke(Method m)
@@ -613,7 +626,7 @@ public class Session extends SessionInvoker
acquireCredit();
}
- synchronized (commands)
+ synchronized (commandsLock)
{
if (state == DETACHED && m.isUnreliable())
{
@@ -629,7 +642,7 @@ public class Session extends SessionInvoker
Thread current = Thread.currentThread();
if (!current.equals(resumer) )
{
- Waiter w = new Waiter(commands, timeout);
+ Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && (state != OPEN && state != CLOSED))
{
checkFailoverRequired("Command was interrupted because of failover, before being sent");
@@ -678,7 +691,7 @@ public class Session extends SessionInvoker
if (isFull(next))
{
- Waiter w = new Waiter(commands, timeout);
+ Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && isFull(next) && state != CLOSED)
{
if (state == OPEN || state == RESUMING)
@@ -735,7 +748,7 @@ public class Session extends SessionInvoker
if ((replayTransfer) || m.hasCompletionListener())
{
- commands[mod(next, commands.length)] = m;
+ setCommand(next, m);
commandBytes += m.getBodySize();
}
if (autoSync)
@@ -817,7 +830,7 @@ public class Session extends SessionInvoker
public void sync(long timeout)
{
log.debug("%s sync()", this);
- synchronized (commands)
+ synchronized (commandsLock)
{
int point = commandsOut - 1;
@@ -826,19 +839,13 @@ public class Session extends SessionInvoker
executionSync(SYNC);
}
- Waiter w = new Waiter(commands, timeout);
+ Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
checkFailoverRequired("Session sync was interrupted by failover.");
if(log.isDebugEnabled())
{
- List<Method> waitingFor =
- Arrays.asList(commands)
- .subList(mod(maxComplete,commands.length),
- mod(commandsOut-1, commands.length) < mod(maxComplete, commands.length)
- ? commands.length-1
- : mod(commandsOut-1, commands.length));
- log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, waitingFor);
+ log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands);
}
w.await();
}
@@ -909,7 +916,7 @@ public class Session extends SessionInvoker
protected <T> Future<T> invoke(Method m, Class<T> klass)
{
- synchronized (commands)
+ synchronized (commandsLock)
{
int command = commandsOut;
ResultFuture<T> future = new ResultFuture<T>(klass);
@@ -1019,7 +1026,7 @@ public class Session extends SessionInvoker
{
log.debug("Closing [%s] in state [%s]", this, state);
}
- synchronized (commands)
+ synchronized (commandsLock)
{
switch(state)
{
@@ -1043,7 +1050,7 @@ public class Session extends SessionInvoker
protected void awaitClose()
{
- Waiter w = new Waiter(commands, timeout);
+ Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && state != CLOSED)
{
checkFailoverRequired("close() was interrupted by failover.");
@@ -1063,7 +1070,7 @@ public class Session extends SessionInvoker
public void closed()
{
- synchronized (commands)
+ synchronized (commandsLock)
{
if (closing || getException() != null)
{
@@ -1074,7 +1081,7 @@ public class Session extends SessionInvoker
state = DETACHED;
}
- commands.notifyAll();
+ commandsLock.notifyAll();
synchronized (results)
{
@@ -1171,9 +1178,9 @@ public class Session extends SessionInvoker
//prevent them waiting for timeout for 60 seconds
//and possibly preventing failover proceeding
_failoverRequired.set(true);
- synchronized (commands)
+ synchronized (commandsLock)
{
- commands.notifyAll();
+ commandsLock.notifyAll();
}
synchronized (results)
{