diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-02-11 19:16:48 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-02-11 19:16:48 +0000 |
commit | b6f66f84691f3bd547a7d22691295e1ba93211e6 (patch) | |
tree | 62ec68b272bbaa4243a6ee2363dabe3d8748a509 | |
parent | 4e66e71ceea69b37d9be26e1a35eaada0431e931 (diff) | |
download | qpid-python-b6f66f84691f3bd547a7d22691295e1ba93211e6.tar.gz |
QPID-1658: added a byte limit for the number of commands in the session replay buffer, and made the buffer length configurable
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@743455 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java | 13 | ||||
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java | 15 |
2 files changed, 25 insertions, 3 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java index 6b99f6d5d3..09cfd119be 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java @@ -112,6 +112,19 @@ public abstract class Method extends Struct implements ProtocolEvent throw new UnsupportedOperationException(); } + public int getBodySize() + { + ByteBuffer body = getBody(); + if (body == null) + { + return 0; + } + else + { + return body.remaining(); + } + } + public abstract byte getEncodedTrack(); public abstract <C> void dispatch(C context, MethodDelegate<C> delegate); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 951370a124..f94edcc655 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -95,7 +95,9 @@ public class Session extends SessionInvoker // outgoing command count private int commandsOut = 0; - private Method[] commands = new Method[64*1024]; + private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)]; + private int commandBytes = 0; + private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024); private int maxComplete = commandsOut - 1; private boolean needSync = false; @@ -432,7 +434,13 @@ public class Session extends SessionInvoker int old = maxComplete; for (int id = max(maxComplete, lower); le(id, upper); id++) { - commands[mod(id, commands.length)] = null; + int idx = mod(id, commands.length); + Method m = commands[idx]; + if (m != null) + { + commandBytes -= m.getBodySize(); + } + commands[idx] = null; } if (le(lower, maxComplete + 1)) { @@ -462,7 +470,7 @@ public class Session extends SessionInvoker final private boolean isFull(int id) { - return id - maxComplete >= commands.length; + return id - maxComplete >= commands.length || commandBytes >= byteLimit; } public void invoke(Method m) @@ -542,6 +550,7 @@ public class Session extends SessionInvoker if (expiry > 0) { commands[mod(next, commands.length)] = m; + commandBytes += m.getBodySize(); } if (autoSync) { |