summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-02-11 19:16:48 +0000
committerRafael H. Schloming <rhs@apache.org>2009-02-11 19:16:48 +0000
commita57a8590b2aad71438fde0b977cd928c70dcdf2f (patch)
tree0054883d20a5c35efc714bb8f73a16bec7bffbca
parent86d5aa7ccc3fb6e9cf23a52e8b2d69aae53692f7 (diff)
downloadqpid-python-a57a8590b2aad71438fde0b977cd928c70dcdf2f.tar.gz
QPID-1658: added a byte limit for the number of commands in the session replay buffer, and made the buffer length configurable
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@743455 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Method.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java15
2 files changed, 25 insertions, 3 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Method.java b/java/common/src/main/java/org/apache/qpid/transport/Method.java
index 6b99f6d5d3..09cfd119be 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Method.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Method.java
@@ -112,6 +112,19 @@ public abstract class Method extends Struct implements ProtocolEvent
throw new UnsupportedOperationException();
}
+ public int getBodySize()
+ {
+ ByteBuffer body = getBody();
+ if (body == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return body.remaining();
+ }
+ }
+
public abstract byte getEncodedTrack();
public abstract <C> void dispatch(C context, MethodDelegate<C> delegate);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 951370a124..f94edcc655 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -95,7 +95,9 @@ public class Session extends SessionInvoker
// outgoing command count
private int commandsOut = 0;
- private Method[] commands = new Method[64*1024];
+ private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)];
+ private int commandBytes = 0;
+ private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024);
private int maxComplete = commandsOut - 1;
private boolean needSync = false;
@@ -432,7 +434,13 @@ public class Session extends SessionInvoker
int old = maxComplete;
for (int id = max(maxComplete, lower); le(id, upper); id++)
{
- commands[mod(id, commands.length)] = null;
+ int idx = mod(id, commands.length);
+ Method m = commands[idx];
+ if (m != null)
+ {
+ commandBytes -= m.getBodySize();
+ }
+ commands[idx] = null;
}
if (le(lower, maxComplete + 1))
{
@@ -462,7 +470,7 @@ public class Session extends SessionInvoker
final private boolean isFull(int id)
{
- return id - maxComplete >= commands.length;
+ return id - maxComplete >= commands.length || commandBytes >= byteLimit;
}
public void invoke(Method m)
@@ -542,6 +550,7 @@ public class Session extends SessionInvoker
if (expiry > 0)
{
commands[mod(next, commands.length)] = m;
+ commandBytes += m.getBodySize();
}
if (autoSync)
{