diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java | 87 |
1 files changed, 47 insertions, 40 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 5a9ea73cae..3fc596d0eb 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -33,7 +33,6 @@ import static org.apache.qpid.transport.Session.State.RESUMING; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.transport.network.Frame; -import static org.apache.qpid.transport.util.Functions.mod; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; import static org.apache.qpid.util.Serial.ge; @@ -44,11 +43,9 @@ import static org.apache.qpid.util.Serial.max; import static org.apache.qpid.util.Strings.toUTF8; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,7 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class Session extends SessionInvoker { private static final Logger log = Logger.get(Session.class); - + public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED } static class DefaultSessionListener implements SessionListener @@ -113,7 +110,9 @@ public class Session extends SessionInvoker // outgoing command count private int commandsOut = 0; - private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)]; + private final int commandLimit = Integer.getInteger("qpid.session.command_limit", 64 * 1024); + private Map<Integer,Method> commands = new HashMap<Integer, Method>(); + private final Object commandsLock = new Object(); private int commandBytes = 0; private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024); private int maxComplete = commandsOut - 1; @@ -196,7 +195,7 @@ public class Session extends SessionInvoker public void setAutoSync(boolean value) { - synchronized (commands) + synchronized (commandsLock) { this.autoSync = value; } @@ -204,10 +203,10 @@ public class Session extends SessionInvoker protected void setState(State state) { - synchronized (commands) + synchronized (commandsLock) { this.state = state; - commands.notifyAll(); + commandsLock.notifyAll(); } } @@ -276,13 +275,13 @@ public class Session extends SessionInvoker void resume() { _failoverRequired.set(false); - synchronized (commands) + synchronized (commandsLock) { attach(); for (int i = maxComplete + 1; lt(i, commandsOut); i++) { - Method m = commands[mod(i, commands.length)]; + Method m = getCommand(i); if (m == null) { m = new ExecutionSync(); @@ -337,11 +336,27 @@ public class Session extends SessionInvoker } } + private Method getCommand(int i) + { + return commands.get(i); + } + + private void setCommand(int commandId, Method command) + { + commands.put(commandId, command); + } + + private Method removeCommand(int id) + { + return commands.remove(id); + } + void dump() { - synchronized (commands) + synchronized (commandsLock) { - for (Method m : commands) + TreeMap<Integer, Method> ordered = new TreeMap<Integer, Method>(commands); + for (Method m : ordered.values()) { if (m != null) { @@ -484,7 +499,7 @@ public class Session extends SessionInvoker copy = processed.copy(); } - synchronized (commands) + synchronized (commandsLock) { if (state == DETACHED || state == CLOSING || state == CLOSED) { @@ -539,18 +554,16 @@ public class Session extends SessionInvoker { log.debug("%s complete(%d, %d)", this, lower, upper); } - synchronized (commands) + synchronized (commandsLock) { int old = maxComplete; for (int id = max(maxComplete, lower); le(id, upper); id++) { - int idx = mod(id, commands.length); - Method m = commands[idx]; + Method m = removeCommand(id); if (m != null) { commandBytes -= m.getBodySize(); m.complete(); - commands[idx] = null; } } if (le(lower, maxComplete + 1)) @@ -563,7 +576,7 @@ public class Session extends SessionInvoker log.debug("%s commands remaining: %s", this, commandsOut - maxComplete); } - commands.notifyAll(); + commandsLock.notifyAll(); return gt(maxComplete, old); } } @@ -596,7 +609,7 @@ public class Session extends SessionInvoker protected boolean isCommandsFull(int id) { - return id - maxComplete >= commands.length; + return id - maxComplete >= commandLimit; } public void invoke(Method m) @@ -613,7 +626,7 @@ public class Session extends SessionInvoker acquireCredit(); } - synchronized (commands) + synchronized (commandsLock) { if (state == DETACHED && m.isUnreliable()) { @@ -629,7 +642,7 @@ public class Session extends SessionInvoker Thread current = Thread.currentThread(); if (!current.equals(resumer) ) { - Waiter w = new Waiter(commands, timeout); + Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && (state != OPEN && state != CLOSED)) { checkFailoverRequired("Command was interrupted because of failover, before being sent"); @@ -678,7 +691,7 @@ public class Session extends SessionInvoker if (isFull(next)) { - Waiter w = new Waiter(commands, timeout); + Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && isFull(next) && state != CLOSED) { if (state == OPEN || state == RESUMING) @@ -735,7 +748,7 @@ public class Session extends SessionInvoker if ((replayTransfer) || m.hasCompletionListener()) { - commands[mod(next, commands.length)] = m; + setCommand(next, m); commandBytes += m.getBodySize(); } if (autoSync) @@ -817,7 +830,7 @@ public class Session extends SessionInvoker public void sync(long timeout) { log.debug("%s sync()", this); - synchronized (commands) + synchronized (commandsLock) { int point = commandsOut - 1; @@ -826,19 +839,13 @@ public class Session extends SessionInvoker executionSync(SYNC); } - Waiter w = new Waiter(commands, timeout); + Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { checkFailoverRequired("Session sync was interrupted by failover."); if(log.isDebugEnabled()) { - List<Method> waitingFor = - Arrays.asList(commands) - .subList(mod(maxComplete,commands.length), - mod(commandsOut-1, commands.length) < mod(maxComplete, commands.length) - ? commands.length-1 - : mod(commandsOut-1, commands.length)); - log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, waitingFor); + log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); } w.await(); } @@ -909,7 +916,7 @@ public class Session extends SessionInvoker protected <T> Future<T> invoke(Method m, Class<T> klass) { - synchronized (commands) + synchronized (commandsLock) { int command = commandsOut; ResultFuture<T> future = new ResultFuture<T>(klass); @@ -1019,7 +1026,7 @@ public class Session extends SessionInvoker { log.debug("Closing [%s] in state [%s]", this, state); } - synchronized (commands) + synchronized (commandsLock) { switch(state) { @@ -1043,7 +1050,7 @@ public class Session extends SessionInvoker protected void awaitClose() { - Waiter w = new Waiter(commands, timeout); + Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && state != CLOSED) { checkFailoverRequired("close() was interrupted by failover."); @@ -1063,7 +1070,7 @@ public class Session extends SessionInvoker public void closed() { - synchronized (commands) + synchronized (commandsLock) { if (closing || getException() != null) { @@ -1074,7 +1081,7 @@ public class Session extends SessionInvoker state = DETACHED; } - commands.notifyAll(); + commandsLock.notifyAll(); synchronized (results) { @@ -1171,9 +1178,9 @@ public class Session extends SessionInvoker //prevent them waiting for timeout for 60 seconds //and possibly preventing failover proceeding _failoverRequired.set(true); - synchronized (commands) + synchronized (commandsLock) { - commands.notifyAll(); + commandsLock.notifyAll(); } synchronized (results) { |