summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-03-14 10:46:40 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-03-14 10:46:40 +0000
commit20cf766ec6465c52c56984780256791d97f481ac (patch)
tree2011d6c817c64b47bd32973c221df702c7c0d38e
parent90ddc20dc80b95f3d5dacb20418b08138cff88a1 (diff)
downloadqpid-python-20cf766ec6465c52c56984780256791d97f481ac.tar.gz
QPID-592 : Parameterised the Read/Write buffer limits. On the broker extra config [read|write]BufferLimitSize on the client System properties qpid.[read|write].buffer.limit. All the defaults are 256k(262144).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637047 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/etc/config.xml4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java16
-rw-r--r--java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java9
-rw-r--r--java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java11
5 files changed, 36 insertions, 27 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml
index 78d96e52fd..80ee039ee5 100644
--- a/java/broker/etc/config.xml
+++ b/java/broker/etc/config.xml
@@ -33,7 +33,9 @@
<keystorePassword>keystorepass</keystorePassword>
</ssl>-->
<qpidnio>false</qpidnio>
- <protectio>false</protectio>
+ <protectio>
+ <enabled>false</enabled>
+ </protectio>
<transport>nio</transport>
<port>5672</port>
<sslport>8672</sslport>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index db5d882f51..d8dbf97e49 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -29,10 +29,8 @@ import org.apache.mina.common.IoSession;
import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.SessionUtil;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
@@ -58,6 +56,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
private final IApplicationRegistry _applicationRegistry;
+ private static String DEFAULT_BUFFER_READ_LIMIT_SIZE = "262144";
+ private static String DEFAULT_BUFFER_WRITE_LIMIT_SIZE = "262144";
+
+ private final int BUFFER_READ_LIMIT_SIZE;
+ private final int BUFFER_WRITE_LIMIT_SIZE;
public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
{
@@ -67,6 +70,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
{
_applicationRegistry = applicationRegistry;
+
+ // Read the configuration from the application registry
+ BUFFER_READ_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE));
+ BUFFER_WRITE_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE));
+
_logger.debug("AMQPFastProtocolHandler created");
}
@@ -115,27 +123,22 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
}
- if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false))
+ if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio.enabled", false))
{
try
{
// //Add IO Protection Filters
IoFilterChain chain = protocolSession.getFilterChain();
- int buf_size = 32768;
- if (protocolSession.getConfig() instanceof SocketSessionConfig)
- {
- buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize();
- }
protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE);
readfilter.attach(chain);
WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+ writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE);
writefilter.attach(chain);
protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 89982a1af0..7909663f24 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -30,7 +30,6 @@ import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -156,6 +155,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** Defines the default timeout to use for synchronous protocol commands. */
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+ /** Default buffer size for pending messages reads */
+ private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
+
+ /** Default buffer size for pending messages writes */
+ private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144";
+
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -219,19 +224,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter
//Add IO Protection Filters
IoFilterChain chain = session.getFilterChain();
- int buf_size = 32768;
- if (session.getConfig() instanceof SocketSessionConfig)
- {
- buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize();
- }
session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT)));
readfilter.attach(chain);
WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+ writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT)));
writefilter.attach(chain);
session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
diff --git a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java
index 0b6ed81d18..b93dc46741 100644
--- a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java
+++ b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java
@@ -131,19 +131,20 @@ public class IOWriterClient implements Runnable
private int _receivedCount = 0;
private int _sentCount = 0;
+ private static final String DEFAULT_READ_BUFFER = "262144";
+ private static final String DEFAULT_WRITE_BUFFER = "262144";
public void sessionCreated(IoSession session) throws Exception
{
IoFilterChain chain = session.getFilterChain();
- int buf_size = ((SocketSessionConfig) session.getConfig()).getSendBufferSize();
ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER)));
readfilter.attach(chain);
WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
-// writefilter.setMaximumConnectionBufferCount(1000);
- writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+
+ writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER)));
writefilter.attach(chain);
}
diff --git a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java
index 82ef3d57cc..423e98c67b 100644
--- a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java
+++ b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java
@@ -39,6 +39,9 @@ public class IOWriterServer
static public int _PORT = 9999;
+ private static final String DEFAULT_READ_BUFFER = "262144";
+ private static final String DEFAULT_WRITE_BUFFER = "262144";
+
private static class TestHandler extends IoHandlerAdapter
{
@@ -52,14 +55,14 @@ public class IOWriterServer
{
IoFilterChain chain = ioSession.getFilterChain();
- int buf_size = ((SocketSessionConfig) ioSession.getConfig()).getReceiveBufferSize();
-
ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER)));
readfilter.attach(chain);
WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+
+ writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER)));
+
writefilter.attach(chain);
}