summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-02-11 19:16:48 +0000
committerRafael H. Schloming <rhs@apache.org>2009-02-11 19:16:48 +0000
commitb6f66f84691f3bd547a7d22691295e1ba93211e6 (patch)
tree62ec68b272bbaa4243a6ee2363dabe3d8748a509
parent4e66e71ceea69b37d9be26e1a35eaada0431e931 (diff)
downloadqpid-python-b6f66f84691f3bd547a7d22691295e1ba93211e6.tar.gz
QPID-1658: added a byte limit for the number of commands in the session replay buffer, and made the buffer length configurable
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@743455 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java15
2 files changed, 25 insertions, 3 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
index 6b99f6d5d3..09cfd119be 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
@@ -112,6 +112,19 @@ public abstract class Method extends Struct implements ProtocolEvent
throw new UnsupportedOperationException();
}
+ public int getBodySize()
+ {
+ ByteBuffer body = getBody();
+ if (body == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return body.remaining();
+ }
+ }
+
public abstract byte getEncodedTrack();
public abstract <C> void dispatch(C context, MethodDelegate<C> delegate);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 951370a124..f94edcc655 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -95,7 +95,9 @@ public class Session extends SessionInvoker
// outgoing command count
private int commandsOut = 0;
- private Method[] commands = new Method[64*1024];
+ private Method[] commands = new Method[Integer.getInteger("qpid.session.command_limit", 64*1024)];
+ private int commandBytes = 0;
+ private int byteLimit = Integer.getInteger("qpid.session.byte_limit", 1024*1024);
private int maxComplete = commandsOut - 1;
private boolean needSync = false;
@@ -432,7 +434,13 @@ public class Session extends SessionInvoker
int old = maxComplete;
for (int id = max(maxComplete, lower); le(id, upper); id++)
{
- commands[mod(id, commands.length)] = null;
+ int idx = mod(id, commands.length);
+ Method m = commands[idx];
+ if (m != null)
+ {
+ commandBytes -= m.getBodySize();
+ }
+ commands[idx] = null;
}
if (le(lower, maxComplete + 1))
{
@@ -462,7 +470,7 @@ public class Session extends SessionInvoker
final private boolean isFull(int id)
{
- return id - maxComplete >= commands.length;
+ return id - maxComplete >= commands.length || commandBytes >= byteLimit;
}
public void invoke(Method m)
@@ -542,6 +550,7 @@ public class Session extends SessionInvoker
if (expiry > 0)
{
commands[mod(next, commands.length)] = m;
+ commandBytes += m.getBodySize();
}
if (autoSync)
{