diff options
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/Method.java | 13 | ||||
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/Session.java | 15 |
2 files changed, 25 insertions, 3 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Method.java b/java/common/src/main/java/org/apache/qpid/transport/Method.java index 6b99f6d5d3..09cfd119be 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Method.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Method.java @@ -112,6 +112,19 @@ public abstract class Method extends Struct implements ProtocolEvent throw new UnsupportedOperationException(); } + public int getBodySize() + { + ByteBuffer body = getBody(); + if (body == null) + { + return 0; + } + else + { + return body.remaining(); + } + } + public abstract byte getEncodedTrack(); public abstract <C> void dispatch(C context, MethodDelegate<C> delegate); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 951370a124..f94edcc655 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -95,7 +95,9 @@ public class Session extends SessionInvoker // outgoing command count private int commandsOut = 0; - private Method[] commands = new Method[64*1024]; + private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)]; + private int commandBytes = 0; + private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024); private int maxComplete = commandsOut - 1; private boolean needSync = false; @@ -432,7 +434,13 @@ public class Session extends SessionInvoker int old = maxComplete; for (int id = max(maxComplete, lower); le(id, upper); id++) { - commands[mod(id, commands.length)] = null; + int idx = mod(id, commands.length); + Method m = commands[idx]; + if (m != null) + { + commandBytes -= m.getBodySize(); + } + commands[idx] = null; } if (le(lower, maxComplete + 1)) { @@ -462,7 +470,7 @@ public class Session extends SessionInvoker final private boolean isFull(int id) { - return id - maxComplete >= commands.length; + return id - maxComplete >= commands.length || commandBytes >= byteLimit; } public void invoke(Method m) @@ -542,6 +550,7 @@ public class Session extends SessionInvoker if (expiry > 0) { commands[mod(next, commands.length)] = m; + commandBytes += m.getBodySize(); } if (autoSync) { |